This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bcaebe3af [INLONG-5131][Sort] Import all changelog mode data ingest 
into Iiceberg or DLC (#5141)
bcaebe3af is described below

commit bcaebe3afd44af8279d55f203fbfad27afad797a
Author: Oneal65 <[email protected]>
AuthorDate: Tue Jul 26 12:16:22 2022 +0800

    [INLONG-5131][Sort] Import all changelog mode data ingest into Iiceberg or 
DLC (#5141)
---
 .../protocol/node/load/DLCIcebergLoadNode.java     |   4 +-
 .../sort/protocol/node/load/IcebergLoadNode.java   |   4 +-
 inlong-sort/sort-connectors/iceberg-dlc/pom.xml    |   5 +-
 .../apache/inlong/sort/iceberg/FlinkCatalog.java   | 731 +++++++++++++++++++++
 .../inlong/sort/iceberg/FlinkCatalogFactory.java   | 166 +++++
 .../inlong/sort/iceberg/FlinkConfigOptions.java    |  47 ++
 .../sort/iceberg/FlinkDynamicTableFactory.java     | 218 ++++++
 .../inlong/sort/iceberg/IcebergTableSink.java      | 119 ++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 licenses/inlong-sort-connectors/LICENSE            |   9 +
 10 files changed, 1315 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
index 41891ad8f..2d96febc5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
@@ -90,7 +90,9 @@ public class DLCIcebergLoadNode extends LoadNode implements 
Serializable {
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "iceberg");
+        options.put("connector", "iceberg-inlong");
+        // for test sink.ignore.changelog
+        // options.put("sink.ignore.changelog", "true");
         options.put("catalog-database", dbName);
         options.put("catalog-table", tableName);
         options.put("default-database", dbName);
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 2d649c288..3752c978c 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -94,7 +94,9 @@ public class IcebergLoadNode extends LoadNode implements 
Serializable {
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "iceberg");
+        options.put("connector", "iceberg-inlong");
+        // for test sink.ignore.changelog
+        // options.put("sink.ignore.changelog", "true");
         options.put("catalog-database", dbName);
         options.put("catalog-table", tableName);
         options.put("default-database", dbName);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml 
b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
index a672631d0..0e1e9f571 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
@@ -88,8 +88,9 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-flink-runtime-1.13</artifactId>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-iceberg</artifactId>
+            <version>1.3.0-SNAPSHOT</version>
         </dependency>
 
         <dependency>
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
new file mode 100644
index 000000000..2056a8d02
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ * <p>
+ * The mapping between Flink database and Iceberg namespace:
+ * Supplying a base namespace for a given catalog, so if you have a catalog 
that supports a 2-level namespace, you
+ * would supply the first level in the catalog configuration and the second 
level would be exposed as Flink databases.
+ * </p>
+ * The Iceberg table manages its partitions by itself. The partition of the 
Iceberg table is independent of the
+ * partition of Flink.
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+    private final CatalogLoader catalogLoader;
+    private final Catalog icebergCatalog;
+    private final Namespace baseNamespace;
+    private final SupportsNamespaces asNamespaceCatalog;
+    private final Closeable closeable;
+    private final boolean cacheEnabled;
+
+    public FlinkCatalog(
+            String catalogName,
+            String defaultDatabase,
+            Namespace baseNamespace,
+            CatalogLoader catalogLoader,
+            boolean cacheEnabled) {
+        super(catalogName, defaultDatabase);
+        this.catalogLoader = catalogLoader;
+        this.baseNamespace = baseNamespace;
+        this.cacheEnabled = cacheEnabled;
+
+        Catalog originalCatalog = catalogLoader.loadCatalog();
+        icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : 
originalCatalog;
+        asNamespaceCatalog =
+                originalCatalog instanceof SupportsNamespaces ? 
(SupportsNamespaces) originalCatalog : null;
+        closeable = originalCatalog instanceof Closeable ? (Closeable) 
originalCatalog : null;
+    }
+
+    private static void validateFlinkTable(CatalogBaseTable table) {
+        Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+        TableSchema schema = table.getSchema();
+        schema.getTableColumns().forEach(column -> {
+            if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
+                throw new UnsupportedOperationException("Creating table with 
computed columns is not supported yet.");
+            }
+        });
+
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            throw new UnsupportedOperationException("Creating table with 
watermark specs is not supported yet.");
+        }
+    }
+
+    private static PartitionSpec toPartitionSpec(List<String> partitionKeys, 
Schema icebergSchema) {
+        PartitionSpec.Builder builder = 
PartitionSpec.builderFor(icebergSchema);
+        partitionKeys.forEach(builder::identity);
+        return builder.build();
+    }
+
+    private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
+        List<String> partitionKeys = Lists.newArrayList();
+        for (PartitionField field : spec.fields()) {
+            if (field.transform().isIdentity()) {
+                
partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+            } else {
+                // Not created by Flink SQL.
+                // For compatibility with iceberg tables, return empty.
+                // TODO modify this after Flink support partition transform.
+                return Collections.emptyList();
+            }
+        }
+        return partitionKeys;
+    }
+
+    private static void commitChanges(Table table, String setLocation, String 
setSnapshotId,
+            String pickSnapshotId, Map<String, String> setProperties) {
+        // don't allow setting the snapshot and picking a commit
+        // at the same time because order is ambiguous and choosing
+        // one order leads to different results
+        Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == 
null,
+                "Cannot set the current snapshot ID and cherry-pick snapshot 
changes");
+
+        if (setSnapshotId != null) {
+            long newSnapshotId = Long.parseLong(setSnapshotId);
+            table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+        }
+
+        // if updating the table snapshot, perform that update first in case 
it fails
+        if (pickSnapshotId != null) {
+            long newSnapshotId = Long.parseLong(pickSnapshotId);
+            table.manageSnapshots().cherrypick(newSnapshotId).commit();
+        }
+
+        Transaction transaction = table.newTransaction();
+
+        if (setLocation != null) {
+            transaction.updateLocation()
+                    .setLocation(setLocation)
+                    .commit();
+        }
+
+        if (!setProperties.isEmpty()) {
+            UpdateProperties updateProperties = transaction.updateProperties();
+            setProperties.forEach((k, v) -> {
+                if (v == null) {
+                    updateProperties.remove(k);
+                } else {
+                    updateProperties.set(k, v);
+                }
+            });
+            updateProperties.commit();
+        }
+
+        transaction.commitTransaction();
+    }
+
+    static CatalogTable toCatalogTable(Table table) {
+        TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
+        List<String> partitionKeys = toPartitionKeys(table.spec(), 
table.schema());
+
+        // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, 
because Flink optimizer may use
+        // CatalogTableImpl to copy a new catalog table.
+        // Let's re-loading table from Iceberg catalog when creating 
source/sink operators.
+        // Iceberg does not have Table comment, so pass a null (Default 
comment value in Flink).
+        return new CatalogTableImpl(schema, partitionKeys, table.properties(), 
null);
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        // Create the default database if it does not exist.
+        try {
+            createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
+        } catch (DatabaseAlreadyExistException e) {
+            // Ignore the exception if it's already exist.
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException e) {
+                throw new CatalogException(e);
+            }
+        }
+    }
+
+    public Catalog catalog() {
+        return icebergCatalog;
+    }
+
+    private Namespace toNamespace(String database) {
+        String[] namespace = new String[baseNamespace.levels().length + 1];
+        System.arraycopy(baseNamespace.levels(), 0, namespace, 0, 
baseNamespace.levels().length);
+        namespace[baseNamespace.levels().length] = database;
+        return Namespace.of(namespace);
+    }
+
+    TableIdentifier toIdentifier(ObjectPath path) {
+        return TableIdentifier.of(toNamespace(path.getDatabaseName()), 
path.getObjectName());
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        if (asNamespaceCatalog == null) {
+            return Collections.singletonList(getDefaultDatabase());
+        }
+
+        return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
+                .map(n -> n.level(n.levels().length - 1))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        if (asNamespaceCatalog == null) {
+            if (!getDefaultDatabase().equals(databaseName)) {
+                throw new DatabaseNotExistException(getName(), databaseName);
+            } else {
+                return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+            }
+        } else {
+            try {
+                Map<String, String> metadata =
+                        
Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
+                String comment = metadata.remove("comment");
+                return new CatalogDatabaseImpl(metadata, comment);
+            } catch (NoSuchNamespaceException e) {
+                throw new DatabaseNotExistException(getName(), databaseName, 
e);
+            }
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        try {
+            getDatabase(databaseName);
+            return true;
+        } catch (DatabaseNotExistException ignore) {
+            return false;
+        }
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        createDatabase(name, mergeComment(database.getProperties(), 
database.getComment()), ignoreIfExists);
+    }
+
+    private void createDatabase(String databaseName, Map<String, String> 
metadata, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            try {
+                asNamespaceCatalog.createNamespace(toNamespace(databaseName), 
metadata);
+            } catch (AlreadyExistsException e) {
+                if (!ignoreIfExists) {
+                    throw new DatabaseAlreadyExistException(getName(), 
databaseName, e);
+                }
+            }
+        } else {
+            throw new UnsupportedOperationException("Namespaces are not 
supported by catalog: " + getName());
+        }
+    }
+
+    private Map<String, String> mergeComment(Map<String, String> metadata, 
String comment) {
+        Map<String, String> ret = Maps.newHashMap(metadata);
+        if (metadata.containsKey("comment")) {
+            throw new CatalogException("Database properties should not contain 
key: 'comment'.");
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+            ret.put("comment", comment);
+        }
+        return ret;
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (asNamespaceCatalog != null) {
+            try {
+                boolean success = 
asNamespaceCatalog.dropNamespace(toNamespace(name));
+                if (!success && !ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name);
+                }
+            } catch (NoSuchNamespaceException e) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name, e);
+                }
+            } catch (NamespaceNotEmptyException e) {
+                throw new DatabaseNotEmptyException(getName(), name, e);
+            }
+        } else {
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(getName(), name);
+            }
+        }
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            Namespace namespace = toNamespace(name);
+            Map<String, String> updates = Maps.newHashMap();
+            Set<String> removals = Sets.newHashSet();
+
+            try {
+                Map<String, String> oldProperties = 
asNamespaceCatalog.loadNamespaceMetadata(namespace);
+                Map<String, String> newProperties = 
mergeComment(newDatabase.getProperties(), newDatabase.getComment());
+
+                for (String key : oldProperties.keySet()) {
+                    if (!newProperties.containsKey(key)) {
+                        removals.add(key);
+                    }
+                }
+
+                for (Map.Entry<String, String> entry : 
newProperties.entrySet()) {
+                    if 
(!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
+                        updates.put(entry.getKey(), entry.getValue());
+                    }
+                }
+
+                if (!updates.isEmpty()) {
+                    asNamespaceCatalog.setProperties(namespace, updates);
+                }
+
+                if (!removals.isEmpty()) {
+                    asNamespaceCatalog.removeProperties(namespace, removals);
+                }
+
+            } catch (NoSuchNamespaceException e) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name, e);
+                }
+            }
+        } else {
+            if (getDefaultDatabase().equals(name)) {
+                throw new CatalogException(
+                        "Can not alter the default database when the iceberg 
catalog doesn't support namespaces.");
+            }
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(getName(), name);
+            }
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+        try {
+            return 
icebergCatalog.listTables(toNamespace(databaseName)).stream()
+                    .map(TableIdentifier::name)
+                    .collect(Collectors.toList());
+        } catch (NoSuchNamespaceException e) {
+            throw new DatabaseNotExistException(getName(), databaseName, e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+        Table table = loadIcebergTable(tablePath);
+        return toCatalogTable(table);
+    }
+
+    private Table loadIcebergTable(ObjectPath tablePath) throws 
TableNotExistException {
+        try {
+            Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+            if (cacheEnabled) {
+                table.refresh();
+            }
+
+            return table;
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            throw new TableNotExistException(getName(), tablePath, e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return icebergCatalog.tableExists(toIdentifier(tablePath));
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            icebergCatalog.dropTable(toIdentifier(tablePath));
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath, e);
+            }
+        }
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+        try {
+            icebergCatalog.renameTable(
+                    toIdentifier(tablePath),
+                    toIdentifier(new ObjectPath(tablePath.getDatabaseName(), 
newTableName)));
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath, e);
+            }
+        } catch (AlreadyExistsException e) {
+            throw new TableAlreadyExistException(getName(), tablePath, e);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+            throws CatalogException, TableAlreadyExistException {
+        if (Objects.equals(table.getOptions().get("connector"), 
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
+            throw new IllegalArgumentException("Cannot create the table with 
'connector'='iceberg' table property in "
+                    + "an iceberg catalog, Please create table with 
'connector'='iceberg' "
+                    + "property in a non-iceberg catalog "
+                    + "or create table without 'connector'='iceberg' related 
properties in an iceberg table.");
+        }
+
+        createIcebergTable(tablePath, table, ignoreIfExists);
+    }
+
+    void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+            throws CatalogException, TableAlreadyExistException {
+        validateFlinkTable(table);
+
+        Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+        PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+        ImmutableMap.Builder<String, String> properties = 
ImmutableMap.builder();
+        String location = null;
+        for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+            if ("location".equalsIgnoreCase(entry.getKey())) {
+                location = entry.getValue();
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        try {
+            icebergCatalog.createTable(
+                    toIdentifier(tablePath),
+                    icebergSchema,
+                    spec,
+                    location,
+                    properties.build());
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(getName(), tablePath, e);
+            }
+        }
+    }
+
+    @Override
+    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+            throws CatalogException, TableNotExistException {
+        validateFlinkTable(newTable);
+
+        Table icebergTable;
+        try {
+            icebergTable = loadIcebergTable(tablePath);
+        } catch (TableNotExistException e) {
+            if (!ignoreIfNotExists) {
+                throw e;
+            } else {
+                return;
+            }
+        }
+
+        CatalogTable table = toCatalogTable(icebergTable);
+
+        // Currently, Flink SQL only support altering table properties.
+
+        // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by comparing
+        // CatalogTable instances, unless the Flink schema contains Iceberg 
column IDs.
+        if (!table.getSchema().equals(newTable.getSchema())) {
+            throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+        }
+
+        if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+            throw new UnsupportedOperationException("Altering partition keys 
is not supported yet.");
+        }
+
+        Map<String, String> oldProperties = table.getOptions();
+        Map<String, String> setProperties = Maps.newHashMap();
+
+        String setLocation = null;
+        String setSnapshotId = null;
+        String pickSnapshotId = null;
+
+        for (Map.Entry<String, String> entry : 
newTable.getOptions().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            if (Objects.equals(value, oldProperties.get(key))) {
+                continue;
+            }
+
+            if ("location".equalsIgnoreCase(key)) {
+                setLocation = value;
+            } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+                setSnapshotId = value;
+            } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+                pickSnapshotId = value;
+            } else {
+                setProperties.put(key, value);
+            }
+        }
+
+        oldProperties.keySet().forEach(k -> {
+            if (!newTable.getOptions().containsKey(k)) {
+                setProperties.put(k, null);
+            }
+        });
+
+        commitChanges(icebergTable, setLocation, setSnapshotId, 
pickSnapshotId, setProperties);
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(new FlinkDynamicTableFactory(this));
+    }
+
+    CatalogLoader getCatalogLoader() {
+        return catalogLoader;
+    }
+
+    // ------------------------------ Unsupported methods 
---------------------------------------------
+
+    @Override
+    public List<String> listViews(String databaseName) throws CatalogException 
{
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition,
+            boolean ignoreIfExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition,
+            boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createFunction(ObjectPath functionPath, CatalogFunction 
function, boolean ignoreIfExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterFunction(ObjectPath functionPath, CatalogFunction 
newFunction, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics tableStatistics,
+            boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics, boolean 
ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics, boolean 
ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        Table table = loadIcebergTable(tablePath);
+
+        if (table.spec().isUnpartitioned()) {
+            throw new TableNotPartitionedException(icebergCatalog.name(), 
tablePath);
+        }
+
+        Set<CatalogPartitionSpec> set = Sets.newHashSet();
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
+            for (DataFile dataFile : CloseableIterable.transform(tasks, 
FileScanTask::file)) {
+                Map<String, String> map = Maps.newHashMap();
+                StructLike structLike = dataFile.partition();
+                PartitionSpec spec = table.specs().get(dataFile.specId());
+                for (int i = 0; i < structLike.size(); i++) {
+                    map.put(spec.fields().get(i).name(), 
String.valueOf(structLike.get(i, Object.class)));
+                }
+                set.add(new CatalogPartitionSpec(map));
+            }
+        } catch (IOException e) {
+            throw new CatalogException(String.format("Failed to list 
partitions of table %s", tablePath), e);
+        }
+
+        return Lists.newArrayList(set);
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
tablePath, List<Expression> filters)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // After partition pruning and filter push down, the statistics have 
become very inaccurate, so the statistics from
+    // here are of little significance.
+    // Flink will support something like SupportsReportStatistics in future.
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+            throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath 
tablePath,
+            CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
new file mode 100644
index 000000000..f4bb7d49f
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
+ * <p>
+ * This supports the following catalog configuration options:
+ * <ul>
+ *   <li><code>type</code> - Flink catalog factory key, should be 
"iceberg"</li>
+ *   <li><code>catalog-type</code> - iceberg catalog type, "hive" or 
"hadoop"</li>
+ *   <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
+ *   <li><code>clients</code> - the Hive Client Pool Size (Hive catalog 
only)</li>
+ *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
+ *   <li><code>default-database</code> - a database name to use as the 
default</li>
+ *   <li><code>base-namespace</code> - a base namespace as the prefix for all 
databases (Hadoop catalog only)</li>
+ *   <li><code>cache-enabled</code> - whether to enable catalog cache</li>
+ * </ul>
+ * </p>
+ * To use a custom catalog that is not a Hive or Hadoop catalog, extend this 
class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+    // Can not just use "type", it conflicts with CATALOG_TYPE.
+    public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+    public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+    public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public static final String DEFAULT_DATABASE = "default-database";
+    public static final String DEFAULT_DATABASE_NAME = "default";
+    public static final String BASE_NAMESPACE = "base-namespace";
+    public static final String CACHE_ENABLED = "cache-enabled";
+
+    public static final String TYPE = "type";
+    public static final String PROPERTY_VERSION = "property-version";
+
+    /**
+     * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to 
be used by this Flink catalog adapter.
+     *
+     * @param name Flink's catalog name
+     * @param properties Flink's catalog properties
+     * @param hadoopConf Hadoop configuration for catalog
+     * @return an Iceberg catalog loader
+     */
+    static CatalogLoader createCatalogLoader(String name, Map<String, String> 
properties, Configuration hadoopConf) {
+        String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
+        if (catalogImpl != null) {
+            String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+            Preconditions.checkArgument(catalogType == null,
+                    "Cannot create catalog %s, both catalog-type and "
+                            + "catalog-impl are set: catalog-type=%s, 
catalog-impl=%s",
+                    name, catalogType, catalogImpl);
+            return CatalogLoader.custom(name, properties, hadoopConf, 
catalogImpl);
+        }
+
+        String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, 
ICEBERG_CATALOG_TYPE_HIVE);
+        switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+            case ICEBERG_CATALOG_TYPE_HIVE:
+                // The values of properties 'uri', 'warehouse',
+                // 'hive-conf-dir' are allowed to be null, in that case it will
+                // fallback to parse those values from hadoop configuration 
which is loaded from classpath.
+                String hiveConfDir = properties.get(HIVE_CONF_DIR);
+                Configuration newHadoopConf = mergeHiveConf(hadoopConf, 
hiveConfDir);
+                return CatalogLoader.hive(name, newHadoopConf, properties);
+
+            case ICEBERG_CATALOG_TYPE_HADOOP:
+                return CatalogLoader.hadoop(name, hadoopConf, properties);
+
+            default:
+                throw new UnsupportedOperationException("Unknown catalog-type: 
" + catalogType
+                        + " (Must be 'hive' or 'hadoop')");
+        }
+    }
+
+    private static Configuration mergeHiveConf(Configuration hadoopConf, 
String hiveConfDir) {
+        Configuration newConf = new Configuration(hadoopConf);
+        if (!Strings.isNullOrEmpty(hiveConfDir)) {
+            Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, 
"hive-site.xml")),
+                    "There should be a hive-site.xml file under the directory 
%s", hiveConfDir);
+            newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+        } else {
+            // If don't provide the hive-site.xml path explicitly, it will try 
to load resource from classpath. If still
+            // couldn't load the configuration file, then it will throw 
exception in HiveCatalog.
+            URL configFile = 
CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+            if (configFile != null) {
+                newConf.addResource(configFile);
+            }
+        }
+        return newConf;
+    }
+
+    public static Configuration clusterHadoopConf() {
+        return 
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> context = Maps.newHashMap();
+        context.put(TYPE, "iceberg");
+        context.put(PROPERTY_VERSION, "1");
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        return ImmutableList.of("*");
+    }
+
+    @Override
+    public Catalog createCatalog(String name, Map<String, String> properties) {
+        return createCatalog(name, properties, clusterHadoopConf());
+    }
+
+    protected Catalog createCatalog(String name, Map<String, String> 
properties, Configuration hadoopConf) {
+        CatalogLoader catalogLoader = createCatalogLoader(name, properties, 
hadoopConf);
+        String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, 
DEFAULT_DATABASE_NAME);
+
+        Namespace baseNamespace = Namespace.empty();
+        if (properties.containsKey(BASE_NAMESPACE)) {
+            baseNamespace = 
Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+        }
+
+        boolean cacheEnabled = 
Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
+        return new FlinkCatalog(name, defaultDatabase, baseNamespace, 
catalogLoader, cacheEnabled);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
new file mode 100644
index 000000000..180eeb665
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkConfigOptions {
+
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+            ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("If is false, parallelism of source are 
set by config.\n"
+                            + "If is true, source parallelism is inferred 
according to splits number.\n");
+    public static final ConfigOption<Integer> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
+            
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("Sets max infer parallelism for source 
operator.");
+    public static final ConfigOption<Boolean> ICEBERG_IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Regard upsert delete as insert kind.");
+
+    private FlinkConfigOptions() {
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
new file mode 100644
index 000000000..80de87d78
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.IcebergTableSource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
+
+/**
+ * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.1
+ *
+ * <p>
+ * Factory for creating configured instances of {@link IcebergTableSource} and 
{@link
+ * IcebergTableSink}.We modify KafkaDynamicTableSink to support append-mode .
+ * </p>
+ */
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+
+    static final String FACTORY_IDENTIFIER = "iceberg-inlong";
+
+    private static final ConfigOption<String> CATALOG_NAME =
+            ConfigOptions.key("catalog-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Catalog name");
+
+    private static final ConfigOption<String> CATALOG_TYPE =
+            ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Catalog type, the optional types are: 
custom, hadoop, hive.");
+
+    private static final ConfigOption<String> CATALOG_DATABASE =
+            ConfigOptions.key("catalog-database")
+                    .stringType()
+                    .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+                    .withDescription("Database name managed in the iceberg 
catalog.");
+
+    private static final ConfigOption<String> CATALOG_TABLE =
+            ConfigOptions.key("catalog-table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name managed in the underlying 
iceberg catalog and database.");
+
+    // Flink 1.13.x change the return type from CatalogTable interface to 
ResolvedCatalogTable which extends the
+    // CatalogTable. Here we use the dynamic method loading approach to avoid 
adding explicit CatalogTable or
+    // ResolvedCatalogTable class into the iceberg-flink-runtime jar for 
compatibility purpose.
+    private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = 
DynMethods.builder("getCatalogTable")
+            .impl(Context.class, "getCatalogTable")
+            .orNoop()
+            .build();
+
+    private final FlinkCatalog catalog;
+
+    public FlinkDynamicTableFactory() {
+        this.catalog = null;
+    }
+
+    public FlinkDynamicTableFactory(FlinkCatalog catalog) {
+        this.catalog = catalog;
+    }
+
+    private static CatalogTable loadCatalogTable(Context context) {
+        return GET_CATALOG_TABLE.invoke(context);
+    }
+
+    private static TableLoader createTableLoader(CatalogBaseTable 
catalogBaseTable,
+            Map<String, String> tableProps,
+            String databaseName,
+            String tableName) {
+        Configuration flinkConf = new Configuration();
+        tableProps.forEach(flinkConf::setString);
+
+        String catalogName = flinkConf.getString(CATALOG_NAME);
+        Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be 
null", CATALOG_NAME.key());
+
+        String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, 
databaseName);
+        Preconditions.checkNotNull(catalogDatabase, "The iceberg database name 
cannot be null");
+
+        String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
+        Preconditions.checkNotNull(catalogTable, "The iceberg table name 
cannot be null");
+
+        org.apache.hadoop.conf.Configuration hadoopConf = 
FlinkCatalogFactory.clusterHadoopConf();
+        FlinkCatalogFactory factory = new FlinkCatalogFactory();
+        FlinkCatalog flinkCatalog = (FlinkCatalog) 
factory.createCatalog(catalogName, tableProps, hadoopConf);
+        ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
+
+        // Create database if not exists in the external catalog.
+        if (!flinkCatalog.databaseExists(catalogDatabase)) {
+            try {
+                flinkCatalog.createDatabase(catalogDatabase, new 
CatalogDatabaseImpl(Maps.newHashMap(), null), true);
+            } catch (DatabaseAlreadyExistException e) {
+                throw new AlreadyExistsException(e, "Database %s already 
exists in the iceberg catalog %s.",
+                        catalogName,
+                        catalogDatabase);
+            }
+        }
+
+        // Create table if not exists in the external catalog.
+        if (!flinkCatalog.tableExists(objectPath)) {
+            try {
+                flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, 
true);
+            } catch (TableAlreadyExistException e) {
+                throw new AlreadyExistsException(e, "Table %s already exists 
in the database %s and catalog %s",
+                        catalogTable, catalogDatabase, catalogName);
+            }
+        }
+
+        return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(),
+                TableIdentifier.of(catalogDatabase, catalogTable));
+    }
+
+    private static TableLoader createTableLoader(FlinkCatalog catalog, 
ObjectPath objectPath) {
+        Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+        return TableLoader.fromCatalog(catalog.getCatalogLoader(), 
catalog.toIdentifier(objectPath));
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+        CatalogTable catalogTable = loadCatalogTable(context);
+        Map<String, String> tableProps = catalogTable.getOptions();
+        TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+        TableLoader tableLoader;
+        if (catalog != null) {
+            tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+        } else {
+            tableLoader = createTableLoader(catalogTable, tableProps, 
objectIdentifier.getDatabaseName(),
+                    objectIdentifier.getObjectName());
+        }
+
+        return new IcebergTableSource(tableLoader, tableSchema, tableProps, 
context.getConfiguration());
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+        CatalogTable catalogTable = loadCatalogTable(context);
+        Map<String, String> tableProps = catalogTable.getOptions();
+        TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+        TableLoader tableLoader;
+        if (catalog != null) {
+            tableLoader = createTableLoader(catalog, objectPath);
+        } else {
+            tableLoader = createTableLoader(catalogTable, tableProps, 
objectPath.getDatabaseName(),
+                    objectPath.getObjectName());
+        }
+
+        return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = Sets.newHashSet();
+        options.add(CATALOG_TYPE);
+        options.add(CATALOG_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = Sets.newHashSet();
+        options.add(CATALOG_DATABASE);
+        options.add(CATALOG_TABLE);
+        options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
+        return options;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FACTORY_IDENTIFIER;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
new file mode 100644
index 000000000..ee5c5f65c
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
+
+public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableSink.class);
+
+    private final TableLoader tableLoader;
+    private final TableSchema tableSchema;
+
+    private final CatalogTable catalogTable;
+
+    private boolean overwrite = false;
+
+    private IcebergTableSink(IcebergTableSink toCopy) {
+        this.tableLoader = toCopy.tableLoader;
+        this.tableSchema = toCopy.tableSchema;
+        this.overwrite = toCopy.overwrite;
+        this.catalogTable = toCopy.catalogTable;
+    }
+
+    public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, 
CatalogTable catalogTable) {
+        this.tableLoader = tableLoader;
+        this.tableSchema = tableSchema;
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        Preconditions.checkState(!overwrite || context.isBounded(),
+                "Unbounded data stream doesn't support overwrite operation.");
+
+        List<String> equalityColumns = tableSchema.getPrimaryKey()
+                .map(UniqueConstraint::getColumns)
+                .orElseGet(ImmutableList::of);
+
+        return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
+                .tableLoader(tableLoader)
+                .tableSchema(tableSchema)
+                .equalityFieldColumns(equalityColumns)
+                .overwrite(overwrite)
+                .append();
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partition) {
+        // The flink's PartitionFanoutWriter will handle the static partition 
write policy automatically.
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if 
(org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+                .get(ICEBERG_IGNORE_ALL_CHANGELOG)) {
+            LOG.warn("Iceberg sink receive all changelog record. "
+                    + "Regard any other record as insert-only record.");
+            return ChangelogMode.all();
+        } else {
+            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            for (RowKind kind : requestedMode.getContainedKinds()) {
+                builder.addContainedKind(kind);
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new IcebergTableSink(this);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Iceberg table sink";
+    }
+
+    @Override
+    public void applyOverwrite(boolean newOverwrite) {
+        this.overwrite = newOverwrite;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..8f9ff09fc
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 1a7b4a44a..9a86ef2ca 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -486,6 +486,14 @@
   Source  : flink-connector-hbase-2.2 1.13.5 (Please note that the software 
have been modified.)
   License : https://github.com/apache/flink/blob/master/LICENSE
 
+   1.3.6 
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+         
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+         
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
+         
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+         
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+    Source  : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software 
have been modified.)
+    License : https://github.com/apache/iceberg/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 
@@ -756,6 +764,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
   org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apache Iceberg 
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
   org.apache.iceberg:iceberg-orc:0.13.1 - Apache Iceberg 
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
   org.apache.iceberg:iceberg-parquet:0.13.1 - Apache Iceberg 
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-flink-runtime-1.13:jar:0.13.1 - Apache Iceberg 
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
   org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson 
(https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13),
 (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2 - 
Jackson-dataformat-YAML 
(https://github.com/FasterXML/jackson-dataformat-yaml/tree/jackson-dataformat-yaml-2.13.2),
 (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2 - Jackson 
datatype: JSR310 
(https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.13.2),
 (The Apache Software License, Version 2.0)

Reply via email to