This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5d7fc81e67 [INLONG-8643][Sort] Support Iceberg source (#8818)
5d7fc81e67 is described below
commit 5d7fc81e6761bba8289b8f862af92eacc487771a
Author: vernedeng <[email protected]>
AuthorDate: Fri Sep 1 14:31:40 2023 +0800
[INLONG-8643][Sort] Support Iceberg source (#8818)
---
.../resource/sort/DefaultSortConfigOperator.java | 1 -
.../sort/protocol/constant/IcebergConstant.java | 13 +
.../inlong/sort/protocol/node/ExtractNode.java | 2 +
.../org/apache/inlong/sort/protocol/node/Node.java | 2 +
.../IcebergExtracNode.java} | 107 +--
.../sort/protocol/node/load/IcebergLoadNode.java | 12 +-
.../parser/Iceberg2StarRocksSqlParserTest.java | 133 ++++
inlong-sort/sort-flink/sort-flink-v1.13/pom.xml | 3 +-
inlong-sort/sort-flink/sort-flink-v1.15/pom.xml | 5 -
.../sort-connectors/iceberg/pom.xml | 139 ++++
.../apache/inlong/sort/iceberg/FlinkCatalog.java | 812 +++++++++++++++++++++
.../inlong/sort/iceberg/FlinkCatalogFactory.java | 216 ++++++
.../sort/iceberg/FlinkDynamicTableFactory.java | 205 ++++++
.../sort/iceberg/FlinkEnvironmentContext.java | 35 +
.../org.apache.flink.table.factories.Factory | 18 +
.../org.apache.flink.table.factories.TableFactory | 16 +
.../sort-flink-v1.15/sort-connectors/pom.xml | 1 +
licenses/inlong-sort-connectors/LICENSE | 8 +
pom.xml | 4 +-
19 files changed, 1666 insertions(+), 66 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 97683f1ac3..37429cf2a5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -253,5 +253,4 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
groupInfo.getExtList().removeIf(ext ->
extInfo.getKeyName().equals(ext.getKeyName()));
groupInfo.getExtList().add(extInfo);
}
-
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 70ea0d5158..676f7f4435 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -22,6 +22,19 @@ package org.apache.inlong.sort.protocol.constant;
*/
public class IcebergConstant {
+ public static final String DEFAULT_CATALOG_NAME = "ICEBERG_HIVE";
+ public static final String CONNECTOR_KEY = "connector";
+ public static final String CONNECTOR = "iceberg-inlong";
+ public static final String DATABASE_KEY = "catalog-database";
+ public static final String DEFAULT_DATABASE_KEY = "default-database";
+ public static final String TABLE_KEY = "catalog-table";
+ public static final String CATALOG_TYPE_KEY = "catalog-type";
+ public static final String CATALOG_NAME_KEY = "catalog-name";
+ public static final String URI_KEY = "uri";
+ public static final String WAREHOUSE_KEY = "warehouse";
+ public static final String START_SNAPSHOT_ID = "start-snapshot-id";
+ public static final String STREAMING = "streaming";
+
/**
* Iceberg supported catalog type
*/
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 3c27b6a407..543f8cf3be 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -64,6 +65,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = RedisExtractNode.class, name =
"redisExtract"),
@JsonSubTypes.Type(value = DorisExtractNode.class, name =
"dorisExtract"),
@JsonSubTypes.Type(value = HudiExtractNode.class, name =
"hudiExtract"),
+ @JsonSubTypes.Type(value = IcebergExtracNode.class, name =
"icebergExtract"),
})
@Data
@NoArgsConstructor
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index ecd704815b..f755f439d7 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -78,6 +79,7 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = RedisExtractNode.class, name =
"redisExtract"),
@JsonSubTypes.Type(value = DorisExtractNode.class, name =
"dorisExtract"),
@JsonSubTypes.Type(value = HudiExtractNode.class, name =
"hudiExtract"),
+ @JsonSubTypes.Type(value = IcebergExtracNode.class, name =
"icebergExtract"),
@JsonSubTypes.Type(value = TransformNode.class, name =
"baseTransform"),
@JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
similarity index 53%
copy from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
index 257290119a..b876d96aff 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
@@ -15,22 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.node.load;
+package org.apache.inlong.sort.protocol.node.extract;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.constant.IcebergConstant;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
-import org.apache.inlong.sort.protocol.enums.FilterStrategy;
-import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
@@ -41,13 +35,14 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
-@JsonTypeName("icebergLoad")
-@Data
-@NoArgsConstructor
+/**
+ * Iceberg extract node for extract data from iceberg
+ */
@EqualsAndHashCode(callSuper = true)
-public class IcebergLoadNode extends LoadNode implements InlongMetric,
Serializable {
-
- private static final long serialVersionUID = -1L;
+@JsonTypeName("icebergExtract")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+public class IcebergExtracNode extends ExtractNode implements Serializable {
@JsonProperty("tableName")
@Nonnull
@@ -57,67 +52,77 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Serializa
@Nonnull
private String dbName;
- @JsonProperty("primaryKey")
- private String primaryKey;
-
@JsonProperty("catalogType")
private IcebergConstant.CatalogType catalogType;
+ @Nullable
@JsonProperty("uri")
private String uri;
@JsonProperty("warehouse")
private String warehouse;
- @JsonCreator
- public IcebergLoadNode(@JsonProperty("id") String id,
- @JsonProperty("name") String name,
- @JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
- @JsonProperty("filters") List<FilterFunction> filters,
- @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
- @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
- @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("catalogName")
+ private String catalogName;
+
+ @JsonProperty("primaryKey")
+ private String primaryKey;
+
+ @JsonProperty("startSnapShotId")
+ @Nullable
+ private Long startSnapShotId;
+
+ public IcebergExtracNode(
+ @Nonnull @JsonProperty("id") String id,
+ @Nonnull @JsonProperty("name") String name,
+ @Nonnull @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("watermarkField") WatermarkField
watermarkField,
+ @Nullable @JsonProperty("uri") String uri,
+ @Nullable @JsonProperty("warehouse") String warehouse,
@Nonnull @JsonProperty("dbName") String dbName,
@Nonnull @JsonProperty("tableName") String tableName,
- @JsonProperty("primaryKey") String primaryKey,
@JsonProperty("catalogType") IcebergConstant.CatalogType
catalogType,
- @JsonProperty("uri") String uri,
- @JsonProperty("warehouse") String warehouse) {
- super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
- this.tableName = Preconditions.checkNotNull(tableName, "table name is
null");
- this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
- this.primaryKey = primaryKey;
- this.catalogType = catalogType == null ? CatalogType.HIVE :
catalogType;
+ @Nullable @JsonProperty("catalogName") String catalogName,
+ @JsonProperty("primaryKey") String primaryKey,
+ @Nullable @JsonProperty("startSnapShotId") Long startSnapShotId,
+ @Nullable @JsonProperty("properties") Map<String, String>
properties) {
+ super(id, name, fields, watermarkField, properties);
this.uri = uri;
this.warehouse = warehouse;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.catalogName = catalogName == null ?
IcebergConstant.DEFAULT_CATALOG_NAME : catalogName;
+ this.primaryKey = primaryKey;
+ this.startSnapShotId = startSnapShotId;
+ this.catalogType = catalogType == null ?
IcebergConstant.CatalogType.HIVE : catalogType;
+ }
+
+ @Override
+ public String genTableName() {
+ return String.format("iceberg_table_%s", getId());
}
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "iceberg-inlong");
- // for test sink.ignore.changelog
- // options.put("sink.ignore.changelog", "true");
- options.put("catalog-database", dbName);
- options.put("catalog-table", tableName);
- options.put("default-database", dbName);
- options.put("catalog-type", catalogType.name());
- options.put("catalog-name", catalogType.name());
+ options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
+ options.put(IcebergConstant.DATABASE_KEY, dbName);
+ options.put(IcebergConstant.TABLE_KEY, tableName);
+ options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
+ options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName);
+ options.put(IcebergConstant.STREAMING, "true");
if (null != uri) {
- options.put("uri", uri);
+ options.put(IcebergConstant.URI_KEY, uri);
}
if (null != warehouse) {
- options.put("warehouse", warehouse);
+ options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
+ }
+ if (null != startSnapShotId) {
+ options.put(IcebergConstant.START_SNAPSHOT_ID,
startSnapShotId.toString());
}
return options;
}
- @Override
- public String genTableName() {
- return tableName;
- }
-
@Override
public String getPrimaryKey() {
return primaryKey;
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 257290119a..4d90d99c5a 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -96,14 +96,14 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Serializa
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "iceberg-inlong");
+ options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
// for test sink.ignore.changelog
// options.put("sink.ignore.changelog", "true");
- options.put("catalog-database", dbName);
- options.put("catalog-table", tableName);
- options.put("default-database", dbName);
- options.put("catalog-type", catalogType.name());
- options.put("catalog-name", catalogType.name());
+ options.put(IcebergConstant.DATABASE_KEY, dbName);
+ options.put(IcebergConstant.TABLE_KEY, tableName);
+ options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
+ options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
+ options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
if (null != uri) {
options.put("uri", uri);
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
new file mode 100644
index 0000000000..5ec917debe
--- /dev/null
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class Iceberg2StarRocksSqlParserTest extends AbstractTestBase {
+
+ private String groupId = "b_test_wk_0801";
+ private String streamId = "b_test_wkstream_0801";
+
+ // iceberg
+ private String uri = "";
+ private String icDatabase = "";
+ private String icTable = "";
+ private String catalogName = "HIVE";
+ private String warehouse = "";
+
+ // starrocks
+ private String user = "";
+ private String password = "";
+ private String jdbc = "";
+ private String srDatabase = "";
+ private String srTable = "";
+ private String primaryKey = "id";
+ private String loadUrl = "";
+
+ private List<FieldInfo> fields() {
+ return Arrays.asList(
+ new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("source", new StringFormatInfo()),
+ new FieldInfo("count", new LongFormatInfo()),
+ new FieldInfo("remark", new StringFormatInfo()),
+ new FieldInfo("send_time", new StringFormatInfo()));
+ }
+
+ private List<FieldRelation> relations() {
+ return fields().stream()
+ .map(info -> new FieldRelation(info, info))
+ .collect(Collectors.toList());
+ }
+
+ private IcebergExtracNode buildIcebergExtracNode(String id) {
+
+ return new IcebergExtracNode(id, "iceberg-source", fields(), null, uri,
+ warehouse, icDatabase, icTable,
IcebergConstant.CatalogType.HIVE, catalogName,
+ null, null, null);
+
+ }
+
+ private StarRocksLoadNode buildStarRocksLoadNode(String id) {
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("sink.properties.format", "json");
+ properties.put("sink.properties.strip_outer_array", "true");
+ return new StarRocksLoadNode(id, "sink", fields(), relations(), null,
null,
+ 1, properties, jdbc, loadUrl, user, password, srDatabase,
+ srTable, primaryKey, null, null, null, null);
+ }
+
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node>
outputs) {
+ List<String> inputIds =
inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds =
outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ @Test
+ public void testIceberg() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildIcebergExtracNode("1");
+ Node outputNode = buildStarRocksLoadNode("2");
+ StreamInfo streamInfo = new StreamInfo(streamId,
Arrays.asList(inputNode, outputNode),
+
Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo(groupId,
Collections.singletonList(streamInfo));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ System.out.println(objectMapper.writeValueAsString(groupInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+ Assert.assertTrue(!result.getLoadSqls().isEmpty() &&
!result.getCreateTableSqls().isEmpty());
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
index adbaa7ae0b..83784f0290 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
@@ -57,6 +57,7 @@
<hudi.version>0.12.3</hudi.version>
<sqlserver.jdbc.version>7.2.2.jre8</sqlserver.jdbc.version>
<thrift.version>0.9.3</thrift.version>
+ <flink.iceberg.version>1.1.0</flink.iceberg.version>
</properties>
<dependencyManagement>
@@ -148,7 +149,7 @@
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.14</artifactId>
- <version>${iceberg.version}</version>
+ <version>${flink.iceberg.version}</version>
</dependency>
<!-- flink -->
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
index 0bbbcefe0d..2e039fedcd 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
@@ -143,11 +143,6 @@
<artifactId>mssql-jdbc</artifactId>
<version>${sqlserver.jdbc.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-runtime-1.14</artifactId>
- <version>${iceberg.version}</version>
- </dependency>
<!-- flink -->
<dependency>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
new file mode 100644
index 0000000000..c481dd8de6
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.15</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-iceberg-v1.15</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-iceberg</name>
+
+ <properties>
+ <iceberg-connector.version>1.3.1</iceberg-connector.version>
+ <flink.iceberg.version>1.15</flink.iceberg.version>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-flink-runtime-${flink.iceberg.version}</artifactId>
+ <version>${iceberg-connector.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-hive-runtime</artifactId>
+ <version>${iceberg-connector.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive3x.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-hive-metastore</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${plugin.shade.version}</version>
+
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+
+ <configuration>
+
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+
<include>com.google.protobuf:protobuf-java</include>
+ <include>org.apache.kafka:*</include>
+ <include>com.fasterxml.*:*</include>
+ <include>org.apache.iceberg:*</include>
+ <include>org.apache.hive:*</include>
+ <!-- Include fixed version 18.0-13.0 of
flink shaded guava -->
+
<include>org.apache.flink:flink-shaded-guava</include>
+ <include>com.google.protobuf:*</include>
+ <include>org.apache.thrift:*</include>
+ <include>com.facebook.*:*</include>
+ </includes>
+ </artifactSet>
+
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.base</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
new file mode 100644
index 0000000000..ba4298d9a1
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ *
+ * <p>The mapping between Flink database and Iceberg namespace: Supplying a
base namespace for a
+ * given catalog, so if you have a catalog that supports a 2-level namespace,
you would supply the
+ * first level in the catalog configuration and the second level would be
exposed as Flink
+ * databases.
+ *
+ * <p>The Iceberg table manages its partitions by itself. The partition of the
Iceberg table is
+ * independent of the partition of Flink.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+ private final CatalogLoader catalogLoader;
+ private final Catalog icebergCatalog;
+ private final Namespace baseNamespace;
+ private final SupportsNamespaces asNamespaceCatalog;
+ private final Closeable closeable;
+ private final boolean cacheEnabled;
+
+ public FlinkCatalog(
+ String catalogName,
+ String defaultDatabase,
+ Namespace baseNamespace,
+ CatalogLoader catalogLoader,
+ boolean cacheEnabled,
+ long cacheExpirationIntervalMs) {
+ super(catalogName, defaultDatabase);
+ this.catalogLoader = catalogLoader;
+ this.baseNamespace = baseNamespace;
+ this.cacheEnabled = cacheEnabled;
+
+ Catalog originalCatalog = catalogLoader.loadCatalog();
+ icebergCatalog =
+ cacheEnabled
+ ? CachingCatalog.wrap(originalCatalog,
cacheExpirationIntervalMs)
+ : originalCatalog;
+ asNamespaceCatalog =
+ originalCatalog instanceof SupportsNamespaces ?
(SupportsNamespaces) originalCatalog : null;
+ closeable = originalCatalog instanceof Closeable ? (Closeable)
originalCatalog : null;
+
+ FlinkEnvironmentContext.init();
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ throw new CatalogException(e);
+ }
+ }
+ }
+
+ public Catalog catalog() {
+ return icebergCatalog;
+ }
+
+ /** Append a new level to the base namespace */
+ private static Namespace appendLevel(Namespace baseNamespace, String
newLevel) {
+ String[] namespace = new String[baseNamespace.levels().length + 1];
+ System.arraycopy(baseNamespace.levels(), 0, namespace, 0,
baseNamespace.levels().length);
+ namespace[baseNamespace.levels().length] = newLevel;
+ return Namespace.of(namespace);
+ }
+
+ TableIdentifier toIdentifier(ObjectPath path) {
+ String objectName = path.getObjectName();
+ List<String> tableName = Splitter.on('$').splitToList(objectName);
+
+ if (tableName.size() == 1) {
+ return TableIdentifier.of(
+ appendLevel(baseNamespace, path.getDatabaseName()),
path.getObjectName());
+ } else if (tableName.size() == 2 &&
MetadataTableType.from(tableName.get(1)) != null) {
+ return TableIdentifier.of(
+ appendLevel(appendLevel(baseNamespace,
path.getDatabaseName()), tableName.get(0)),
+ tableName.get(1));
+ } else {
+ throw new IllegalArgumentException("Illegal table name:" +
objectName);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ if (asNamespaceCatalog == null) {
+ return Collections.singletonList(getDefaultDatabase());
+ }
+
+ return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
+ .map(n -> n.level(n.levels().length - 1))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog == null) {
+ if (!getDefaultDatabase().equals(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } else {
+ return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+ }
+ } else {
+ try {
+ Map<String, String> metadata =
+ Maps.newHashMap(
+
asNamespaceCatalog.loadNamespaceMetadata(appendLevel(baseNamespace,
databaseName)));
+ String comment = metadata.remove("comment");
+ return new CatalogDatabaseImpl(metadata, comment);
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName,
e);
+ }
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ try {
+ getDatabase(databaseName);
+ return true;
+ } catch (DatabaseNotExistException ignore) {
+ return false;
+ }
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ createDatabase(
+ name, mergeComment(database.getProperties(),
database.getComment()), ignoreIfExists);
+ }
+
+ private void createDatabase(
+ String databaseName, Map<String, String> metadata, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ asNamespaceCatalog.createNamespace(appendLevel(baseNamespace,
databaseName), metadata);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(getName(),
databaseName, e);
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Namespaces are not supported by catalog: " + getName());
+ }
+ }
+
+ private Map<String, String> mergeComment(Map<String, String> metadata,
String comment) {
+ Map<String, String> ret = Maps.newHashMap(metadata);
+ if (metadata.containsKey("comment")) {
+ throw new CatalogException("Database properties should not contain
key: 'comment'.");
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ ret.put("comment", comment);
+ }
+ return ret;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ boolean success =
asNamespaceCatalog.dropNamespace(appendLevel(baseNamespace, name));
+ if (!success && !ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ } catch (NamespaceNotEmptyException e) {
+ throw new DatabaseNotEmptyException(getName(), name, e);
+ }
+ } else {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase,
boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ Namespace namespace = appendLevel(baseNamespace, name);
+ Map<String, String> updates = Maps.newHashMap();
+ Set<String> removals = Sets.newHashSet();
+
+ try {
+ Map<String, String> oldProperties =
asNamespaceCatalog.loadNamespaceMetadata(namespace);
+ Map<String, String> newProperties =
+ mergeComment(newDatabase.getProperties(),
newDatabase.getComment());
+
+ for (String key : oldProperties.keySet()) {
+ if (!newProperties.containsKey(key)) {
+ removals.add(key);
+ }
+ }
+
+ for (Map.Entry<String, String> entry :
newProperties.entrySet()) {
+ if
(!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
+ updates.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (!updates.isEmpty()) {
+ asNamespaceCatalog.setProperties(namespace, updates);
+ }
+
+ if (!removals.isEmpty()) {
+ asNamespaceCatalog.removeProperties(namespace, removals);
+ }
+
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ }
+ } else {
+ if (getDefaultDatabase().equals(name)) {
+ throw new CatalogException(
+ "Can not alter the default database when the iceberg
catalog doesn't support namespaces.");
+ }
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ return icebergCatalog.listTables(appendLevel(baseNamespace,
databaseName)).stream()
+ .map(TableIdentifier::name)
+ .collect(Collectors.toList());
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName, e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+ return toCatalogTable(table);
+ }
+
+ private Table loadIcebergTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+ if (cacheEnabled) {
+ table.refresh();
+ }
+
+ return table;
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return icebergCatalog.tableExists(toIdentifier(tablePath));
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ icebergCatalog.dropTable(toIdentifier(tablePath));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException,
CatalogException {
+ try {
+ icebergCatalog.renameTable(
+ toIdentifier(tablePath),
+ toIdentifier(new ObjectPath(tablePath.getDatabaseName(),
newTableName)));
+ } catch (NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws CatalogException, TableAlreadyExistException {
+ if (Objects.equals(
+ table.getOptions().get("connector"),
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
+ throw new IllegalArgumentException(
+ "Cannot create the table with 'connector'='iceberg' table
property in "
+ + "an iceberg catalog, Please create table with
'connector'='iceberg' property in a non-iceberg catalog or "
+ + "create table without 'connector'='iceberg'
related properties in an iceberg table.");
+ }
+
+ createIcebergTable(tablePath, table, ignoreIfExists);
+ }
+
+ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties =
ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath), icebergSchema, spec, location,
properties.build());
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+ }
+
+ private static void validateTableSchemaAndPartition(CatalogTable ct1,
CatalogTable ct2) {
+ TableSchema ts1 = ct1.getSchema();
+ TableSchema ts2 = ct2.getSchema();
+ boolean equalsPrimary = false;
+
+ if (ts1.getPrimaryKey().isPresent() &&
ts2.getPrimaryKey().isPresent()) {
+ equalsPrimary =
+ Objects.equals(ts1.getPrimaryKey().get().getType(),
ts2.getPrimaryKey().get().getType())
+ && Objects.equals(
+ ts1.getPrimaryKey().get().getColumns(),
ts2.getPrimaryKey().get().getColumns());
+ } else if (!ts1.getPrimaryKey().isPresent() &&
!ts2.getPrimaryKey().isPresent()) {
+ equalsPrimary = true;
+ }
+
+ if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
+ && Objects.equals(ts1.getWatermarkSpecs(),
ts2.getWatermarkSpecs())
+ && equalsPrimary)) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys
is not supported yet.");
+ }
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+
+ Table icebergTable;
+ try {
+ icebergTable = loadIcebergTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ } else {
+ return;
+ }
+ }
+
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ // For current Flink Catalog API, support for adding/removing/renaming
columns cannot be done by
+ // comparing
+ // CatalogTable instances, unless the Flink schema contains Iceberg
column IDs.
+ validateTableSchemaAndPartition(table, (CatalogTable) newTable);
+
+ Map<String, String> oldProperties = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry :
newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldProperties.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldProperties
+ .keySet()
+ .forEach(
+ k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId,
pickSnapshotId, setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(
+ table instanceof CatalogTable, "The Table should be a
CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema
+ .getTableColumns()
+ .forEach(
+ column -> {
+ if
(!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
+ throw new UnsupportedOperationException(
+ "Creating table with computed columns
is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Creating table with watermark specs is not supported
yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ PartitionSpec.Builder builder =
PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
+ ImmutableList.Builder<String> partitionKeysBuilder =
ImmutableList.builder();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+
partitionKeysBuilder.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
+ }
+ }
+ return partitionKeysBuilder.build();
+ }
+
+ private static void commitChanges(
+ Table table,
+ String setLocation,
+ String setSnapshotId,
+ String pickSnapshotId,
+ Map<String, String> setProperties) {
+ // don't allow setting the snapshot and picking a commit at the same
time because order is
+ // ambiguous and choosing
+ // one order leads to different results
+ Preconditions.checkArgument(
+ setSnapshotId == null || pickSnapshotId == null,
+ "Cannot set the current snapshot ID and cherry-pick snapshot
changes");
+
+ if (setSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(setSnapshotId);
+ table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+ }
+
+ // if updating the table snapshot, perform that update first in case
it fails
+ if (pickSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(pickSnapshotId);
+ table.manageSnapshots().cherrypick(newSnapshotId).commit();
+ }
+
+ Transaction transaction = table.newTransaction();
+
+ if (setLocation != null) {
+ transaction.updateLocation().setLocation(setLocation).commit();
+ }
+
+ if (!setProperties.isEmpty()) {
+ UpdateProperties updateProperties = transaction.updateProperties();
+ setProperties.forEach(
+ (k, v) -> {
+ if (v == null) {
+ updateProperties.remove(k);
+ } else {
+ updateProperties.set(k, v);
+ }
+ });
+ updateProperties.commit();
+ }
+
+ transaction.commitTransaction();
+ }
+
+ static CatalogTable toCatalogTable(Table table) {
+ TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
+ List<String> partitionKeys = toPartitionKeys(table.spec(),
table.schema());
+
+ // NOTE: We can not create a IcebergCatalogTable extends CatalogTable,
because Flink optimizer
+ // may use
+ // CatalogTableImpl to copy a new catalog table.
+ // Let's re-loading table from Iceberg catalog when creating
source/sink operators.
+ // Iceberg does not have Table comment, so pass a null (Default
comment value in Flink).
+ return new CatalogTableImpl(schema, partitionKeys, table.properties(),
null);
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new FlinkDynamicTableFactory(this));
+ }
+
+ CatalogLoader getCatalogLoader() {
+ return catalogLoader;
+ }
+
+ // ------------------------------ Unsupported methods
+ // ---------------------------------------------
+
+ @Override
+ public List<String> listViews(String databaseName) throws CatalogException
{
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean
ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean
ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean
ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ Table table = loadIcebergTable(tablePath);
+
+ if (table.spec().isUnpartitioned()) {
+ throw new TableNotPartitionedException(icebergCatalog.name(),
tablePath);
+ }
+
+ Set<CatalogPartitionSpec> set = Sets.newHashSet();
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ for (DataFile dataFile : CloseableIterable.transform(tasks,
FileScanTask::file)) {
+ Map<String, String> map = Maps.newHashMap();
+ StructLike structLike = dataFile.partition();
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ for (int i = 0; i < structLike.size(); i++) {
+ map.put(spec.fields().get(i).name(),
String.valueOf(structLike.get(i, Object.class)));
+ }
+ set.add(new CatalogPartitionSpec(map));
+ }
+ } catch (IOException e) {
+ throw new CatalogException(
+ String.format("Failed to list partitions of table %s",
tablePath), e);
+ }
+
+ return Lists.newArrayList(set);
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(
+ ObjectPath tablePath, List<Expression> filters) throws
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // After partition pruning and filter push down, the statistics have
become very inaccurate, so
+ // the statistics from
+ // here are of little significance.
+ // Flink will support something like SupportsReportStatistics in future.
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath
tablePath)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws
CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws
CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
new file mode 100644
index 0000000000..4adf4a3ed8
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link
org.apache.iceberg.flink.FlinkCatalog}.
+ *
+ * <p>This supports the following catalog configuration options:
+ *
+ * <ul>
+ * <li><code>type</code> - Flink catalog factory key, should be "iceberg"
+ * <li><code>catalog-type</code> - iceberg catalog type, "hive", "hadoop" or
"rest"
+ * <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)
+ * <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)
+ * <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)
+ * <li><code>default-database</code> - a database name to use as the default
+ * <li><code>base-namespace</code> - a base namespace as the prefix for all
databases (Hadoop
+ * catalog only)
+ * <li><code>cache-enabled</code> - whether to enable catalog cache
+ * </ul>
+ *
+ * <p>To use a custom catalog that is not a Hive or Hadoop catalog, extend
this class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+ // Can not just use "type", it conflicts with CATALOG_TYPE.
+ public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+ public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+ public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+ public static final String ICEBERG_CATALOG_TYPE_REST = "rest";
+
+ public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
+ public static final String DEFAULT_DATABASE = "default-database";
+ public static final String DEFAULT_DATABASE_NAME = "default";
+ public static final String BASE_NAMESPACE = "base-namespace";
+
+ public static final String TYPE = "type";
+ public static final String PROPERTY_VERSION = "property-version";
+
+ /**
+ * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to
be used by this Flink
+ * catalog adapter.
+ *
+ * @param name Flink's catalog name
+ * @param properties Flink's catalog properties
+ * @param hadoopConf Hadoop configuration for catalog
+ * @return an Iceberg catalog loader
+ */
+ static CatalogLoader createCatalogLoader(
+ String name, Map<String, String> properties, Configuration
hadoopConf) {
+ String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
+ if (catalogImpl != null) {
+ String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+ Preconditions.checkArgument(
+ catalogType == null,
+ "Cannot create catalog %s, both catalog-type and
catalog-impl are set: catalog-type=%s, catalog-impl=%s",
+ name,
+ catalogType,
+ catalogImpl);
+ return CatalogLoader.custom(name, properties, hadoopConf,
catalogImpl);
+ }
+
+ String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE,
ICEBERG_CATALOG_TYPE_HIVE);
+ switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+ case ICEBERG_CATALOG_TYPE_HIVE:
+ // The values of properties 'uri', 'warehouse',
'hive-conf-dir' are allowed to be null, in
+ // that case it will
+ // fallback to parse those values from hadoop configuration
which is loaded from classpath.
+ String hiveConfDir = properties.get(HIVE_CONF_DIR);
+ String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+ Configuration newHadoopConf = mergeHiveConf(hadoopConf,
hiveConfDir, hadoopConfDir);
+ return CatalogLoader.hive(name, newHadoopConf, properties);
+
+ case ICEBERG_CATALOG_TYPE_HADOOP:
+ return CatalogLoader.hadoop(name, hadoopConf, properties);
+
+ case ICEBERG_CATALOG_TYPE_REST:
+ return CatalogLoader.rest(name, hadoopConf, properties);
+
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown catalog-type: " + catalogType + " (Must be
'hive', 'hadoop' or 'rest')");
+ }
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = Maps.newHashMap();
+ context.put(TYPE, "iceberg");
+ context.put(PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ return ImmutableList.of("*");
+ }
+
+ @Override
+ public Catalog createCatalog(String name, Map<String, String> properties) {
+ return createCatalog(name, properties, clusterHadoopConf());
+ }
+
+ protected Catalog createCatalog(
+ String name, Map<String, String> properties, Configuration
hadoopConf) {
+ CatalogLoader catalogLoader = createCatalogLoader(name, properties,
hadoopConf);
+ String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE,
DEFAULT_DATABASE_NAME);
+
+ Namespace baseNamespace = Namespace.empty();
+ if (properties.containsKey(BASE_NAMESPACE)) {
+ baseNamespace =
Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+ }
+
+ boolean cacheEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, CatalogProperties.CACHE_ENABLED,
CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+ long cacheExpirationIntervalMs =
+ PropertyUtil.propertyAsLong(
+ properties,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+ Preconditions.checkArgument(
+ cacheExpirationIntervalMs != 0,
+ "%s is not allowed to be 0.",
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+ return new FlinkCatalog(
+ name,
+ defaultDatabase,
+ baseNamespace,
+ catalogLoader,
+ cacheEnabled,
+ cacheExpirationIntervalMs);
+ }
+
+ private static Configuration mergeHiveConf(
+ Configuration hadoopConf, String hiveConfDir, String
hadoopConfDir) {
+ Configuration newConf = new Configuration(hadoopConf);
+ if (!Strings.isNullOrEmpty(hiveConfDir)) {
+ Preconditions.checkState(
+ Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
+ "There should be a hive-site.xml file under the directory
%s",
+ hiveConfDir);
+ newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+ } else {
+ // If don't provide the hive-site.xml path explicitly, it will try
to load resource from
+ // classpath. If still
+ // couldn't load the configuration file, then it will throw
exception in HiveCatalog.
+ URL configFile =
CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+ if (configFile != null) {
+ newConf.addResource(configFile);
+ }
+ }
+
+ if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+ Preconditions.checkState(
+ Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+ "Failed to load Hadoop configuration: missing %s",
+ Paths.get(hadoopConfDir, "hdfs-site.xml"));
+ newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+ Preconditions.checkState(
+ Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+ "Failed to load Hadoop configuration: missing %s",
+ Paths.get(hadoopConfDir, "core-site.xml"));
+ newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+ }
+
+ return newConf;
+ }
+
+ public static Configuration clusterHadoopConf() {
+ return
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
new file mode 100644
index 0000000000..1edf546c23
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.IcebergTableSink;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.IcebergTableSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory,
DynamicTableSourceFactory {
+
+ static final String FACTORY_IDENTIFIER = "iceberg-inlong";
+
+ private static final ConfigOption<String> CATALOG_NAME =
+ ConfigOptions.key("catalog-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog name");
+
+ private static final ConfigOption<String> CATALOG_TYPE =
+ ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog type, the optional types are:
custom, hadoop, hive.");
+
+ private static final ConfigOption<String> CATALOG_DATABASE =
+ ConfigOptions.key("catalog-database")
+ .stringType()
+ .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+ .withDescription("Database name managed in the iceberg
catalog.");
+
+ private static final ConfigOption<String> CATALOG_TABLE =
+ ConfigOptions.key("catalog-table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name managed in the underlying
iceberg catalog and database.");
+
+ private final FlinkCatalog catalog;
+
+ public FlinkDynamicTableFactory() {
+ this.catalog = null;
+ }
+
+ public FlinkDynamicTableFactory(FlinkCatalog catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+
+ ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+ CatalogTable catalogTable = context.getCatalogTable();
+ Map<String, String> tableProps = catalogTable.getOptions();
+ TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+ TableLoader tableLoader =
+ createTableLoader(
+ catalogTable,
+ tableProps,
+ objectIdentifier.getDatabaseName(),
+ objectIdentifier.getObjectName());
+ return new IcebergTableSource(tableLoader, tableSchema, tableProps,
context.getConfiguration());
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+ CatalogTable catalogTable = context.getCatalogTable();
+ Map<String, String> writeProps = catalogTable.getOptions();
+ TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog, objectPath);
+ } else {
+ tableLoader =
+ createTableLoader(
+ catalogTable, writeProps,
objectPath.getDatabaseName(), objectPath.getObjectName());
+ }
+ return new IcebergTableSink(tableLoader, tableSchema,
context.getConfiguration(), writeProps);
+ }
+
+ private static TableLoader createTableLoader(
+ CatalogBaseTable catalogBaseTable,
+ Map<String, String> tableProps,
+ String databaseName,
+ String tableName) {
+ Configuration flinkConf = new Configuration();
+ tableProps.forEach(flinkConf::setString);
+
+ String catalogName = flinkConf.getString(CATALOG_NAME);
+ Preconditions.checkNotNull(
+ catalogName, "Table property '%s' cannot be null",
CATALOG_NAME.key());
+
+ String catalogDatabase = flinkConf.getString(CATALOG_DATABASE,
databaseName);
+ Preconditions.checkNotNull(catalogDatabase, "The iceberg database name
cannot be null");
+
+ String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
+ Preconditions.checkNotNull(catalogTable, "The iceberg table name
cannot be null");
+
+ org.apache.hadoop.conf.Configuration hadoopConf =
FlinkCatalogFactory.clusterHadoopConf();
+ FlinkCatalogFactory factory = new FlinkCatalogFactory();
+ FlinkCatalog flinkCatalog =
+ (FlinkCatalog) factory.createCatalog(catalogName, tableProps,
hadoopConf);
+ ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
+
+ // Create database if not exists in the external catalog.
+ if (!flinkCatalog.databaseExists(catalogDatabase)) {
+ try {
+ flinkCatalog.createDatabase(
+ catalogDatabase, new
CatalogDatabaseImpl(Maps.newHashMap(), null), true);
+ } catch (DatabaseAlreadyExistException e) {
+ throw new AlreadyExistsException(
+ e,
+ "Database %s already exists in the iceberg catalog
%s.",
+ catalogName,
+ catalogDatabase);
+ }
+ }
+
+ // Create table if not exists in the external catalog.
+ if (!flinkCatalog.tableExists(objectPath)) {
+ try {
+ flinkCatalog.createIcebergTable(objectPath, catalogBaseTable,
true);
+ } catch (TableAlreadyExistException e) {
+ throw new AlreadyExistsException(
+ e,
+ "Table %s already exists in the database %s and
catalog %s",
+ catalogTable,
+ catalogDatabase,
+ catalogName);
+ }
+ }
+
+ return TableLoader.fromCatalog(
+ flinkCatalog.getCatalogLoader(),
TableIdentifier.of(catalogDatabase, catalogTable));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = Sets.newHashSet();
+ options.add(CATALOG_TYPE);
+ options.add(CATALOG_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = Sets.newHashSet();
+ options.add(CATALOG_DATABASE);
+ options.add(CATALOG_TABLE);
+ return options;
+ }
+
+ private static TableLoader createTableLoader(FlinkCatalog catalog,
ObjectPath objectPath) {
+ Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+ return TableLoader.fromCatalog(catalog.getCatalogLoader(),
catalog.toIdentifier(objectPath));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
new file mode 100644
index 0000000000..2a04b4ee7e
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.iceberg.EnvironmentContext;
+import org.apache.iceberg.flink.util.FlinkPackage;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class FlinkEnvironmentContext {
+
+ private FlinkEnvironmentContext() {
+ }
+
+ public static void init() {
+ EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+ EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION,
FlinkPackage.version());
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..7d1e60eab4
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000..254f72875d
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.iceberg.FlinkCatalogFactory
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index e81c324c04..9f360dd169 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>postgres-cdc</module>
<module>starrocks</module>
+ <module>iceberg</module>
</modules>
<properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index cc6d1b407c..9a82ec18d8 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -738,6 +738,14 @@
Source : com.starrocks:flink-connector-starrocks:1.2.7_flink-1.15 (Please
note that the software have been modified.)
License : https://www.apache.org/licenses/LICENSE-2.0.txt
+ 1.3.17
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
+
+ Source : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the
software have been modified.)
+ License : https://github.com/apache/iceberg/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
diff --git a/pom.xml b/pom.xml
index 5e7c0d328f..ca8a551c21 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,7 +62,7 @@
<plugin.assembly.version>3.2.0</plugin.assembly.version>
<plugin.surefire.version>3.0.0-M7</plugin.surefire.version>
<plugin.failsafe.version>3.0.0-M7</plugin.failsafe.version>
- <plugin.shade.version>3.2.4</plugin.shade.version>
+ <plugin.shade.version>3.4.0</plugin.shade.version>
<plugin.maven.source>3.0.1</plugin.maven.source>
<plugin.maven.jar.version>3.2.0</plugin.maven.jar.version>
<exec.maven.version>1.6.0</exec.maven.version>
@@ -153,7 +153,7 @@
<zookeeper.version>3.6.3</zookeeper.version>
<pulsar.version>2.8.4</pulsar.version>
<kafka.version>2.4.1</kafka.version>
- <iceberg.version>1.1.0</iceberg.version>
+ <iceberg.version>1.3.1</iceberg.version>
<flink.version.v1.13>1.13.5</flink.version.v1.13>
<flink.version.v1.15>1.15.4</flink.version.v1.15>
<flink.minor.version>1.13</flink.minor.version>