This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bcaebe3af [INLONG-5131][Sort] Import all changelog mode data ingest
into Iiceberg or DLC (#5141)
bcaebe3af is described below
commit bcaebe3afd44af8279d55f203fbfad27afad797a
Author: Oneal65 <[email protected]>
AuthorDate: Tue Jul 26 12:16:22 2022 +0800
[INLONG-5131][Sort] Import all changelog mode data ingest into Iiceberg or
DLC (#5141)
---
.../protocol/node/load/DLCIcebergLoadNode.java | 4 +-
.../sort/protocol/node/load/IcebergLoadNode.java | 4 +-
inlong-sort/sort-connectors/iceberg-dlc/pom.xml | 5 +-
.../apache/inlong/sort/iceberg/FlinkCatalog.java | 731 +++++++++++++++++++++
.../inlong/sort/iceberg/FlinkCatalogFactory.java | 166 +++++
.../inlong/sort/iceberg/FlinkConfigOptions.java | 47 ++
.../sort/iceberg/FlinkDynamicTableFactory.java | 218 ++++++
.../inlong/sort/iceberg/IcebergTableSink.java | 119 ++++
.../org.apache.flink.table.factories.Factory | 16 +
licenses/inlong-sort-connectors/LICENSE | 9 +
10 files changed, 1315 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
index 41891ad8f..2d96febc5 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
@@ -90,7 +90,9 @@ public class DLCIcebergLoadNode extends LoadNode implements
Serializable {
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "iceberg");
+ options.put("connector", "iceberg-inlong");
+ // for test sink.ignore.changelog
+ // options.put("sink.ignore.changelog", "true");
options.put("catalog-database", dbName);
options.put("catalog-table", tableName);
options.put("default-database", dbName);
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 2d649c288..3752c978c 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -94,7 +94,9 @@ public class IcebergLoadNode extends LoadNode implements
Serializable {
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "iceberg");
+ options.put("connector", "iceberg-inlong");
+ // for test sink.ignore.changelog
+ // options.put("sink.ignore.changelog", "true");
options.put("catalog-database", dbName);
options.put("catalog-table", tableName);
options.put("default-database", dbName);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
index a672631d0..0e1e9f571 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
@@ -88,8 +88,9 @@
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-runtime-1.13</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-iceberg</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
new file mode 100644
index 000000000..2056a8d02
--- /dev/null
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ * <p>
+ * The mapping between Flink database and Iceberg namespace:
+ * Supplying a base namespace for a given catalog, so if you have a catalog
that supports a 2-level namespace, you
+ * would supply the first level in the catalog configuration and the second
level would be exposed as Flink databases.
+ * </p>
+ * The Iceberg table manages its partitions by itself. The partition of the
Iceberg table is independent of the
+ * partition of Flink.
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+ private final CatalogLoader catalogLoader;
+ private final Catalog icebergCatalog;
+ private final Namespace baseNamespace;
+ private final SupportsNamespaces asNamespaceCatalog;
+ private final Closeable closeable;
+ private final boolean cacheEnabled;
+
+ public FlinkCatalog(
+ String catalogName,
+ String defaultDatabase,
+ Namespace baseNamespace,
+ CatalogLoader catalogLoader,
+ boolean cacheEnabled) {
+ super(catalogName, defaultDatabase);
+ this.catalogLoader = catalogLoader;
+ this.baseNamespace = baseNamespace;
+ this.cacheEnabled = cacheEnabled;
+
+ Catalog originalCatalog = catalogLoader.loadCatalog();
+ icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) :
originalCatalog;
+ asNamespaceCatalog =
+ originalCatalog instanceof SupportsNamespaces ?
(SupportsNamespaces) originalCatalog : null;
+ closeable = originalCatalog instanceof Closeable ? (Closeable)
originalCatalog : null;
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
+ throw new UnsupportedOperationException("Creating table with
computed columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with
watermark specs is not supported yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ PartitionSpec.Builder builder =
PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
+ List<String> partitionKeys = Lists.newArrayList();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+
partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
+ }
+ }
+ return partitionKeys;
+ }
+
+ private static void commitChanges(Table table, String setLocation, String
setSnapshotId,
+ String pickSnapshotId, Map<String, String> setProperties) {
+ // don't allow setting the snapshot and picking a commit
+ // at the same time because order is ambiguous and choosing
+ // one order leads to different results
+ Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId ==
null,
+ "Cannot set the current snapshot ID and cherry-pick snapshot
changes");
+
+ if (setSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(setSnapshotId);
+ table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+ }
+
+ // if updating the table snapshot, perform that update first in case
it fails
+ if (pickSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(pickSnapshotId);
+ table.manageSnapshots().cherrypick(newSnapshotId).commit();
+ }
+
+ Transaction transaction = table.newTransaction();
+
+ if (setLocation != null) {
+ transaction.updateLocation()
+ .setLocation(setLocation)
+ .commit();
+ }
+
+ if (!setProperties.isEmpty()) {
+ UpdateProperties updateProperties = transaction.updateProperties();
+ setProperties.forEach((k, v) -> {
+ if (v == null) {
+ updateProperties.remove(k);
+ } else {
+ updateProperties.set(k, v);
+ }
+ });
+ updateProperties.commit();
+ }
+
+ transaction.commitTransaction();
+ }
+
+ static CatalogTable toCatalogTable(Table table) {
+ TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
+ List<String> partitionKeys = toPartitionKeys(table.spec(),
table.schema());
+
+ // NOTE: We can not create a IcebergCatalogTable extends CatalogTable,
because Flink optimizer may use
+ // CatalogTableImpl to copy a new catalog table.
+ // Let's re-loading table from Iceberg catalog when creating
source/sink operators.
+ // Iceberg does not have Table comment, so pass a null (Default
comment value in Flink).
+ return new CatalogTableImpl(schema, partitionKeys, table.properties(),
null);
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ // Create the default database if it does not exist.
+ try {
+ createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
+ } catch (DatabaseAlreadyExistException e) {
+ // Ignore the exception if it's already exist.
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ throw new CatalogException(e);
+ }
+ }
+ }
+
+ public Catalog catalog() {
+ return icebergCatalog;
+ }
+
+ private Namespace toNamespace(String database) {
+ String[] namespace = new String[baseNamespace.levels().length + 1];
+ System.arraycopy(baseNamespace.levels(), 0, namespace, 0,
baseNamespace.levels().length);
+ namespace[baseNamespace.levels().length] = database;
+ return Namespace.of(namespace);
+ }
+
+ TableIdentifier toIdentifier(ObjectPath path) {
+ return TableIdentifier.of(toNamespace(path.getDatabaseName()),
path.getObjectName());
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ if (asNamespaceCatalog == null) {
+ return Collections.singletonList(getDefaultDatabase());
+ }
+
+ return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
+ .map(n -> n.level(n.levels().length - 1))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog == null) {
+ if (!getDefaultDatabase().equals(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } else {
+ return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+ }
+ } else {
+ try {
+ Map<String, String> metadata =
+
Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
+ String comment = metadata.remove("comment");
+ return new CatalogDatabaseImpl(metadata, comment);
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName,
e);
+ }
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ try {
+ getDatabase(databaseName);
+ return true;
+ } catch (DatabaseNotExistException ignore) {
+ return false;
+ }
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ createDatabase(name, mergeComment(database.getProperties(),
database.getComment()), ignoreIfExists);
+ }
+
+ private void createDatabase(String databaseName, Map<String, String>
metadata, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ asNamespaceCatalog.createNamespace(toNamespace(databaseName),
metadata);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(getName(),
databaseName, e);
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException("Namespaces are not
supported by catalog: " + getName());
+ }
+ }
+
+ private Map<String, String> mergeComment(Map<String, String> metadata,
String comment) {
+ Map<String, String> ret = Maps.newHashMap(metadata);
+ if (metadata.containsKey("comment")) {
+ throw new CatalogException("Database properties should not contain
key: 'comment'.");
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ ret.put("comment", comment);
+ }
+ return ret;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ boolean success =
asNamespaceCatalog.dropNamespace(toNamespace(name));
+ if (!success && !ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ } catch (NamespaceNotEmptyException e) {
+ throw new DatabaseNotEmptyException(getName(), name, e);
+ }
+ } else {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase,
boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ Namespace namespace = toNamespace(name);
+ Map<String, String> updates = Maps.newHashMap();
+ Set<String> removals = Sets.newHashSet();
+
+ try {
+ Map<String, String> oldProperties =
asNamespaceCatalog.loadNamespaceMetadata(namespace);
+ Map<String, String> newProperties =
mergeComment(newDatabase.getProperties(), newDatabase.getComment());
+
+ for (String key : oldProperties.keySet()) {
+ if (!newProperties.containsKey(key)) {
+ removals.add(key);
+ }
+ }
+
+ for (Map.Entry<String, String> entry :
newProperties.entrySet()) {
+ if
(!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
+ updates.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (!updates.isEmpty()) {
+ asNamespaceCatalog.setProperties(namespace, updates);
+ }
+
+ if (!removals.isEmpty()) {
+ asNamespaceCatalog.removeProperties(namespace, removals);
+ }
+
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ }
+ } else {
+ if (getDefaultDatabase().equals(name)) {
+ throw new CatalogException(
+ "Can not alter the default database when the iceberg
catalog doesn't support namespaces.");
+ }
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ try {
+ return
icebergCatalog.listTables(toNamespace(databaseName)).stream()
+ .map(TableIdentifier::name)
+ .collect(Collectors.toList());
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName, e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+ return toCatalogTable(table);
+ }
+
+ private Table loadIcebergTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+ if (cacheEnabled) {
+ table.refresh();
+ }
+
+ return table;
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return icebergCatalog.tableExists(toIdentifier(tablePath));
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ icebergCatalog.dropTable(toIdentifier(tablePath));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException,
CatalogException {
+ try {
+ icebergCatalog.renameTable(
+ toIdentifier(tablePath),
+ toIdentifier(new ObjectPath(tablePath.getDatabaseName(),
newTableName)));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws CatalogException, TableAlreadyExistException {
+ if (Objects.equals(table.getOptions().get("connector"),
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
+ throw new IllegalArgumentException("Cannot create the table with
'connector'='iceberg' table property in "
+ + "an iceberg catalog, Please create table with
'connector'='iceberg' "
+ + "property in a non-iceberg catalog "
+ + "or create table without 'connector'='iceberg' related
properties in an iceberg table.");
+ }
+
+ createIcebergTable(tablePath, table, ignoreIfExists);
+ }
+
+ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties =
ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+
+ Table icebergTable;
+ try {
+ icebergTable = loadIcebergTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ } else {
+ return;
+ }
+ }
+
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ // For current Flink Catalog API, support for adding/removing/renaming
columns cannot be done by comparing
+ // CatalogTable instances, unless the Flink schema contains Iceberg
column IDs.
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys
is not supported yet.");
+ }
+
+ Map<String, String> oldProperties = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry :
newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldProperties.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldProperties.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId,
pickSnapshotId, setProperties);
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new FlinkDynamicTableFactory(this));
+ }
+
+ CatalogLoader getCatalogLoader() {
+ return catalogLoader;
+ }
+
+ // ------------------------------ Unsupported methods
---------------------------------------------
+
+ @Override
+ public List<String> listViews(String databaseName) throws CatalogException
{
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition,
+ boolean ignoreIfExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition newPartition,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws
FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction
function, boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction
newFunction, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath,
CatalogTableStatistics tableStatistics,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath,
CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics, boolean
ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics, boolean
ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ Table table = loadIcebergTable(tablePath);
+
+ if (table.spec().isUnpartitioned()) {
+ throw new TableNotPartitionedException(icebergCatalog.name(),
tablePath);
+ }
+
+ Set<CatalogPartitionSpec> set = Sets.newHashSet();
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ for (DataFile dataFile : CloseableIterable.transform(tasks,
FileScanTask::file)) {
+ Map<String, String> map = Maps.newHashMap();
+ StructLike structLike = dataFile.partition();
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ for (int i = 0; i < structLike.size(); i++) {
+ map.put(spec.fields().get(i).name(),
String.valueOf(structLike.get(i, Object.class)));
+ }
+ set.add(new CatalogPartitionSpec(map));
+ }
+ } catch (IOException e) {
+ throw new CatalogException(String.format("Failed to list
partitions of table %s", tablePath), e);
+ }
+
+ return Lists.newArrayList(set);
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath
tablePath, List<Expression> filters)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // After partition pruning and filter push down, the statistics have
become very inaccurate, so the statistics from
+ // here are of little significance.
+ // Flink will support something like SupportsReportStatistics in future.
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath
tablePath)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath
tablePath,
+ CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
new file mode 100644
index 000000000..f4bb7d49f
--- /dev/null
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
+ * <p>
+ * This supports the following catalog configuration options:
+ * <ul>
+ * <li><code>type</code> - Flink catalog factory key, should be
"iceberg"</li>
+ * <li><code>catalog-type</code> - iceberg catalog type, "hive" or
"hadoop"</li>
+ * <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
+ * <li><code>clients</code> - the Hive Client Pool Size (Hive catalog
only)</li>
+ * <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
+ * <li><code>default-database</code> - a database name to use as the
default</li>
+ * <li><code>base-namespace</code> - a base namespace as the prefix for all
databases (Hadoop catalog only)</li>
+ * <li><code>cache-enabled</code> - whether to enable catalog cache</li>
+ * </ul>
+ * </p>
+ * To use a custom catalog that is not a Hive or Hadoop catalog, extend this
class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+ // Can not just use "type", it conflicts with CATALOG_TYPE.
+ public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+ public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+ public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+ public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public static final String DEFAULT_DATABASE = "default-database";
+ public static final String DEFAULT_DATABASE_NAME = "default";
+ public static final String BASE_NAMESPACE = "base-namespace";
+ public static final String CACHE_ENABLED = "cache-enabled";
+
+ public static final String TYPE = "type";
+ public static final String PROPERTY_VERSION = "property-version";
+
+ /**
+ * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to
be used by this Flink catalog adapter.
+ *
+ * @param name Flink's catalog name
+ * @param properties Flink's catalog properties
+ * @param hadoopConf Hadoop configuration for catalog
+ * @return an Iceberg catalog loader
+ */
+ static CatalogLoader createCatalogLoader(String name, Map<String, String>
properties, Configuration hadoopConf) {
+ String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
+ if (catalogImpl != null) {
+ String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+ Preconditions.checkArgument(catalogType == null,
+ "Cannot create catalog %s, both catalog-type and "
+ + "catalog-impl are set: catalog-type=%s,
catalog-impl=%s",
+ name, catalogType, catalogImpl);
+ return CatalogLoader.custom(name, properties, hadoopConf,
catalogImpl);
+ }
+
+ String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE,
ICEBERG_CATALOG_TYPE_HIVE);
+ switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+ case ICEBERG_CATALOG_TYPE_HIVE:
+ // The values of properties 'uri', 'warehouse',
+ // 'hive-conf-dir' are allowed to be null, in that case it will
+ // fallback to parse those values from hadoop configuration
which is loaded from classpath.
+ String hiveConfDir = properties.get(HIVE_CONF_DIR);
+ Configuration newHadoopConf = mergeHiveConf(hadoopConf,
hiveConfDir);
+ return CatalogLoader.hive(name, newHadoopConf, properties);
+
+ case ICEBERG_CATALOG_TYPE_HADOOP:
+ return CatalogLoader.hadoop(name, hadoopConf, properties);
+
+ default:
+ throw new UnsupportedOperationException("Unknown catalog-type:
" + catalogType
+ + " (Must be 'hive' or 'hadoop')");
+ }
+ }
+
+ private static Configuration mergeHiveConf(Configuration hadoopConf,
String hiveConfDir) {
+ Configuration newConf = new Configuration(hadoopConf);
+ if (!Strings.isNullOrEmpty(hiveConfDir)) {
+ Preconditions.checkState(Files.exists(Paths.get(hiveConfDir,
"hive-site.xml")),
+ "There should be a hive-site.xml file under the directory
%s", hiveConfDir);
+ newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+ } else {
+ // If don't provide the hive-site.xml path explicitly, it will try
to load resource from classpath. If still
+ // couldn't load the configuration file, then it will throw
exception in HiveCatalog.
+ URL configFile =
CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+ if (configFile != null) {
+ newConf.addResource(configFile);
+ }
+ }
+ return newConf;
+ }
+
+ public static Configuration clusterHadoopConf() {
+ return
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = Maps.newHashMap();
+ context.put(TYPE, "iceberg");
+ context.put(PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ return ImmutableList.of("*");
+ }
+
+ @Override
+ public Catalog createCatalog(String name, Map<String, String> properties) {
+ return createCatalog(name, properties, clusterHadoopConf());
+ }
+
+ protected Catalog createCatalog(String name, Map<String, String>
properties, Configuration hadoopConf) {
+ CatalogLoader catalogLoader = createCatalogLoader(name, properties,
hadoopConf);
+ String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE,
DEFAULT_DATABASE_NAME);
+
+ Namespace baseNamespace = Namespace.empty();
+ if (properties.containsKey(BASE_NAMESPACE)) {
+ baseNamespace =
Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+ }
+
+ boolean cacheEnabled =
Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
+ return new FlinkCatalog(name, defaultDatabase, baseNamespace,
catalogLoader, cacheEnabled);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
new file mode 100644
index 000000000..180eeb665
--- /dev/null
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkConfigOptions {
+
+ public static final ConfigOption<Boolean>
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+ ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("If is false, parallelism of source are
set by config.\n"
+ + "If is true, source parallelism is inferred
according to splits number.\n");
+ public static final ConfigOption<Integer>
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
+
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Sets max infer parallelism for source
operator.");
+ public static final ConfigOption<Boolean> ICEBERG_IGNORE_ALL_CHANGELOG =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Regard upsert delete as insert kind.");
+
+ private FlinkConfigOptions() {
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
new file mode 100644
index 000000000..80de87d78
--- /dev/null
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.IcebergTableSource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
+
+/**
+ * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.1
+ *
+ * <p>
+ * Factory for creating configured instances of {@link IcebergTableSource} and
{@link
+ * IcebergTableSink}.We modify KafkaDynamicTableSink to support append-mode .
+ * </p>
+ */
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory,
DynamicTableSourceFactory {
+
+ static final String FACTORY_IDENTIFIER = "iceberg-inlong";
+
+ private static final ConfigOption<String> CATALOG_NAME =
+ ConfigOptions.key("catalog-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog name");
+
+ private static final ConfigOption<String> CATALOG_TYPE =
+ ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog type, the optional types are:
custom, hadoop, hive.");
+
+ private static final ConfigOption<String> CATALOG_DATABASE =
+ ConfigOptions.key("catalog-database")
+ .stringType()
+ .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+ .withDescription("Database name managed in the iceberg
catalog.");
+
+ private static final ConfigOption<String> CATALOG_TABLE =
+ ConfigOptions.key("catalog-table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name managed in the underlying
iceberg catalog and database.");
+
+ // Flink 1.13.x change the return type from CatalogTable interface to
ResolvedCatalogTable which extends the
+ // CatalogTable. Here we use the dynamic method loading approach to avoid
adding explicit CatalogTable or
+ // ResolvedCatalogTable class into the iceberg-flink-runtime jar for
compatibility purpose.
+ private static final DynMethods.UnboundMethod GET_CATALOG_TABLE =
DynMethods.builder("getCatalogTable")
+ .impl(Context.class, "getCatalogTable")
+ .orNoop()
+ .build();
+
+ private final FlinkCatalog catalog;
+
+ public FlinkDynamicTableFactory() {
+ this.catalog = null;
+ }
+
+ public FlinkDynamicTableFactory(FlinkCatalog catalog) {
+ this.catalog = catalog;
+ }
+
+ private static CatalogTable loadCatalogTable(Context context) {
+ return GET_CATALOG_TABLE.invoke(context);
+ }
+
+ private static TableLoader createTableLoader(CatalogBaseTable
catalogBaseTable,
+ Map<String, String> tableProps,
+ String databaseName,
+ String tableName) {
+ Configuration flinkConf = new Configuration();
+ tableProps.forEach(flinkConf::setString);
+
+ String catalogName = flinkConf.getString(CATALOG_NAME);
+ Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be
null", CATALOG_NAME.key());
+
+ String catalogDatabase = flinkConf.getString(CATALOG_DATABASE,
databaseName);
+ Preconditions.checkNotNull(catalogDatabase, "The iceberg database name
cannot be null");
+
+ String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
+ Preconditions.checkNotNull(catalogTable, "The iceberg table name
cannot be null");
+
+ org.apache.hadoop.conf.Configuration hadoopConf =
FlinkCatalogFactory.clusterHadoopConf();
+ FlinkCatalogFactory factory = new FlinkCatalogFactory();
+ FlinkCatalog flinkCatalog = (FlinkCatalog)
factory.createCatalog(catalogName, tableProps, hadoopConf);
+ ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
+
+ // Create database if not exists in the external catalog.
+ if (!flinkCatalog.databaseExists(catalogDatabase)) {
+ try {
+ flinkCatalog.createDatabase(catalogDatabase, new
CatalogDatabaseImpl(Maps.newHashMap(), null), true);
+ } catch (DatabaseAlreadyExistException e) {
+ throw new AlreadyExistsException(e, "Database %s already
exists in the iceberg catalog %s.",
+ catalogName,
+ catalogDatabase);
+ }
+ }
+
+ // Create table if not exists in the external catalog.
+ if (!flinkCatalog.tableExists(objectPath)) {
+ try {
+ flinkCatalog.createIcebergTable(objectPath, catalogBaseTable,
true);
+ } catch (TableAlreadyExistException e) {
+ throw new AlreadyExistsException(e, "Table %s already exists
in the database %s and catalog %s",
+ catalogTable, catalogDatabase, catalogName);
+ }
+ }
+
+ return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(),
+ TableIdentifier.of(catalogDatabase, catalogTable));
+ }
+
+ private static TableLoader createTableLoader(FlinkCatalog catalog,
ObjectPath objectPath) {
+ Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+ return TableLoader.fromCatalog(catalog.getCatalogLoader(),
catalog.toIdentifier(objectPath));
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+ CatalogTable catalogTable = loadCatalogTable(context);
+ Map<String, String> tableProps = catalogTable.getOptions();
+ TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog,
objectIdentifier.toObjectPath());
+ } else {
+ tableLoader = createTableLoader(catalogTable, tableProps,
objectIdentifier.getDatabaseName(),
+ objectIdentifier.getObjectName());
+ }
+
+ return new IcebergTableSource(tableLoader, tableSchema, tableProps,
context.getConfiguration());
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+ CatalogTable catalogTable = loadCatalogTable(context);
+ Map<String, String> tableProps = catalogTable.getOptions();
+ TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog, objectPath);
+ } else {
+ tableLoader = createTableLoader(catalogTable, tableProps,
objectPath.getDatabaseName(),
+ objectPath.getObjectName());
+ }
+
+ return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = Sets.newHashSet();
+ options.add(CATALOG_TYPE);
+ options.add(CATALOG_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = Sets.newHashSet();
+ options.add(CATALOG_DATABASE);
+ options.add(CATALOG_TABLE);
+ options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
+ return options;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_IDENTIFIER;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
new file mode 100644
index 000000000..ee5c5f65c
--- /dev/null
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
+
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergTableSink.class);
+
+ private final TableLoader tableLoader;
+ private final TableSchema tableSchema;
+
+ private final CatalogTable catalogTable;
+
+ private boolean overwrite = false;
+
+ private IcebergTableSink(IcebergTableSink toCopy) {
+ this.tableLoader = toCopy.tableLoader;
+ this.tableSchema = toCopy.tableSchema;
+ this.overwrite = toCopy.overwrite;
+ this.catalogTable = toCopy.catalogTable;
+ }
+
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema,
CatalogTable catalogTable) {
+ this.tableLoader = tableLoader;
+ this.tableSchema = tableSchema;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions.checkState(!overwrite || context.isBounded(),
+ "Unbounded data stream doesn't support overwrite operation.");
+
+ List<String> equalityColumns = tableSchema.getPrimaryKey()
+ .map(UniqueConstraint::getColumns)
+ .orElseGet(ImmutableList::of);
+
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
+ .tableLoader(tableLoader)
+ .tableSchema(tableSchema)
+ .equalityFieldColumns(equalityColumns)
+ .overwrite(overwrite)
+ .append();
+ }
+
+ @Override
+ public void applyStaticPartition(Map<String, String> partition) {
+ // The flink's PartitionFanoutWriter will handle the static partition
write policy automatically.
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ if
(org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+ .get(ICEBERG_IGNORE_ALL_CHANGELOG)) {
+ LOG.warn("Iceberg sink receive all changelog record. "
+ + "Regard any other record as insert-only record.");
+ return ChangelogMode.all();
+ } else {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ builder.addContainedKind(kind);
+ }
+ return builder.build();
+ }
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new IcebergTableSink(this);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Iceberg table sink";
+ }
+
+ @Override
+ public void applyOverwrite(boolean newOverwrite) {
+ this.overwrite = newOverwrite;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..8f9ff09fc
--- /dev/null
+++
b/inlong-sort/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 1a7b4a44a..9a86ef2ca 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -486,6 +486,14 @@
Source : flink-connector-hbase-2.2 1.13.5 (Please note that the software
have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
+ 1.3.6
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
+
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+ Source : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software
have been modified.)
+ License : https://github.com/apache/iceberg/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
@@ -756,6 +764,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apache Iceberg
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
org.apache.iceberg:iceberg-orc:0.13.1 - Apache Iceberg
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
org.apache.iceberg:iceberg-parquet:0.13.1 - Apache Iceberg
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-flink-runtime-1.13:jar:0.13.1 - Apache Iceberg
(https://iceberg.apache.org), (The Apache Software License, Version 2.0)
org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson
(https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13),
(The Apache Software License, Version 2.0)
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2 -
Jackson-dataformat-YAML
(https://github.com/FasterXML/jackson-dataformat-yaml/tree/jackson-dataformat-yaml-2.13.2),
(The Apache Software License, Version 2.0)
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2 - Jackson
datatype: JSR310
(https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.13.2),
(The Apache Software License, Version 2.0)