This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 86defcebee Flink: SQL support for dynamic iceberg sink (#15279)
86defcebee is described below
commit 86defcebeeff70c0621adae86d2cb9252ab6f31f
Author: Swapna Marru <[email protected]>
AuthorDate: Mon Feb 23 04:53:58 2026 -0800
Flink: SQL support for dynamic iceberg sink (#15279)
---
.../iceberg/flink/FlinkCreateTableOptions.java | 18 ++
.../iceberg/flink/FlinkDynamicTableFactory.java | 99 ++++++---
.../org/apache/iceberg/flink/IcebergTableSink.java | 233 +++++++++++++++------
.../sink/dynamic/DynamicTableRecordGenerator.java | 39 ++++
.../apache/iceberg/flink/TestIcebergConnector.java | 155 +++++++++++++-
5 files changed, 450 insertions(+), 94 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
index ab69ec5adc..0612260bfe 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
@@ -67,6 +67,24 @@ class FlinkCreateTableOptions {
.noDefaultValue()
.withDescription("Properties for the underlying catalog for iceberg
table.");
+ public static final ConfigOption<Boolean> USE_DYNAMIC_ICEBERG_SINK =
+ ConfigOptions.key("use-dynamic-iceberg-sink")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to use dynamic iceberg sink for routing data to
multiple tables. "
+ + "When enabled, a single sink instance can dynamically
route records to different "
+ + "Iceberg tables based on the logic defined in the
DynamicRecordGenerator implementation. "
+ + "Requires 'dynamic-record-generator-impl' to be specified.
"
+ + "Default is false (uses standard static sink behavior).");
+
+ public static final ConfigOption<String> DYNAMIC_RECORD_GENERATOR_IMPL =
+ ConfigOptions.key("dynamic-record-generator-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Implementation of DynamicTableRecordGenerator class when
use-dynamic-iceberg-sink is enabled.");
+
public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";
public static final String CONNECTOR_PROPS_KEY = "connector";
public static final String LOCATION_KEY = "location";
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
index bd79c11560..b2e2e33b92 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
@@ -88,26 +88,33 @@ public class FlinkDynamicTableFactory
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
Map<String, String> writeProps = resolvedCatalogTable.getOptions();
- ResolvedSchema resolvedSchema =
- ResolvedSchema.of(
- resolvedCatalogTable.getResolvedSchema().getColumns().stream()
- .filter(Column::isPhysical)
- .collect(Collectors.toList()));
- TableLoader tableLoader;
- if (catalog != null) {
- tableLoader = createTableLoader(catalog,
objectIdentifier.toObjectPath());
+ Configuration flinkConf = new Configuration();
+ writeProps.forEach(flinkConf::setString);
+
+ boolean useDynamicSink =
flinkConf.get(FlinkCreateTableOptions.USE_DYNAMIC_ICEBERG_SINK);
+
+ if (useDynamicSink) {
+ return getIcebergTableSinkWithDynamicSinkProps(context, flinkConf,
writeProps);
} else {
- tableLoader =
- createTableLoader(
- resolvedCatalogTable,
- writeProps,
- objectIdentifier.getDatabaseName(),
- objectIdentifier.getObjectName());
- }
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog,
objectIdentifier.toObjectPath());
+ } else {
+ tableLoader =
+ createTableLoader(
+ resolvedCatalogTable,
+ writeProps,
+ objectIdentifier.getDatabaseName(),
+ objectIdentifier.getObjectName());
+ }
- return new IcebergTableSink(
- tableLoader, resolvedSchema, context.getConfiguration(), writeProps);
+ return new IcebergTableSink(
+ tableLoader,
+ resolvedCatalogTable.getResolvedSchema(),
+ context.getConfiguration(),
+ writeProps);
+ }
}
@Override
@@ -123,6 +130,8 @@ public class FlinkDynamicTableFactory
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(FlinkCreateTableOptions.CATALOG_DATABASE);
options.add(FlinkCreateTableOptions.CATALOG_TABLE);
+ options.add(FlinkCreateTableOptions.USE_DYNAMIC_ICEBERG_SINK);
+ options.add(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
return options;
}
@@ -131,6 +140,46 @@ public class FlinkDynamicTableFactory
return FACTORY_IDENTIFIER;
}
+ private IcebergTableSink getIcebergTableSinkWithDynamicSinkProps(
+ Context context, Configuration flinkConf, Map<String, String>
writeProps) {
+ String dynamicRecordGeneratorImpl =
+ flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
+ Preconditions.checkArgument(
+ dynamicRecordGeneratorImpl != null,
+ "Invalid dynamic record generator value: null. %s must be specified
when use-dynamic-iceberg-sink is true.",
+ FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());
+
+ CatalogLoader catalogLoader;
+ if (catalog != null) {
+ catalogLoader = catalog.getCatalogLoader();
+ } else {
+ FlinkCatalog flinkCatalog =
+ createCatalogLoader(writeProps,
flinkConf.get(FlinkCreateTableOptions.CATALOG_NAME));
+ catalogLoader = flinkCatalog.getCatalogLoader();
+ }
+
+ ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
+
+ return new IcebergTableSink(
+ catalogLoader,
+ dynamicRecordGeneratorImpl,
+ resolvedCatalogTable.getResolvedSchema(),
+ context.getConfiguration(),
+ writeProps);
+ }
+
+ private static FlinkCatalog createCatalogLoader(
+ Map<String, String> tableProps, String catalogName) {
+ Preconditions.checkArgument(
+ catalogName != null,
+ "Invalid catalog name: null. Set %s create table option.",
+ FlinkCreateTableOptions.CATALOG_NAME);
+
+ FlinkCatalogFactory factory = new FlinkCatalogFactory();
+ return (FlinkCatalog)
+ factory.createCatalog(catalogName, tableProps,
FlinkCatalogFactory.clusterHadoopConf());
+ }
+
private static TableLoader createTableLoader(
ResolvedCatalogTable resolvedCatalogTable,
Map<String, String> tableProps,
@@ -143,21 +192,16 @@ public class FlinkDynamicTableFactory
mergedProps.forEach(flinkConf::setString);
String catalogName = flinkConf.get(FlinkCreateTableOptions.CATALOG_NAME);
- Preconditions.checkNotNull(
- catalogName,
- "Table property '%s' cannot be null",
- FlinkCreateTableOptions.CATALOG_NAME.key());
String catalogDatabase =
flinkConf.get(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
- Preconditions.checkNotNull(catalogDatabase, "The iceberg database name
cannot be null");
+ Preconditions.checkArgument(
+ catalogDatabase != null,
+ "Invalid database name: null. Set %s create table option or specify
fully qualified table name.",
+ FlinkCreateTableOptions.CATALOG_DATABASE);
String catalogTable = flinkConf.get(FlinkCreateTableOptions.CATALOG_TABLE,
tableName);
- Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be
null");
- org.apache.hadoop.conf.Configuration hadoopConf =
FlinkCatalogFactory.clusterHadoopConf();
- FlinkCatalogFactory factory = new FlinkCatalogFactory();
- FlinkCatalog flinkCatalog =
- (FlinkCatalog) factory.createCatalog(catalogName, mergedProps,
hadoopConf);
+ FlinkCatalog flinkCatalog = createCatalogLoader(mergedProps, catalogName);
ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
// Create database if not exists in the external catalog.
@@ -229,7 +273,6 @@ public class FlinkDynamicTableFactory
}
private static TableLoader createTableLoader(FlinkCatalog catalog,
ObjectPath objectPath) {
- Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
return TableLoader.fromCatalog(catalog.getCatalogLoader(),
catalog.toIdentifier(objectPath));
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
index fa2fb89221..3fc3d093f6 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
@@ -20,7 +20,11 @@ package org.apache.iceberg.flink;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
@@ -28,15 +32,24 @@ import
org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.flink.sink.dynamic.DynamicIcebergSink;
+import org.apache.iceberg.flink.sink.dynamic.DynamicRecordGenerator;
+import org.apache.iceberg.flink.sink.dynamic.DynamicTableRecordGenerator;
+import org.apache.iceberg.flink.sink.dynamic.TableCreator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.PropertyUtil;
public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
+ private final CatalogLoader catalogLoader;
@SuppressWarnings("deprecation")
@Deprecated
@@ -45,16 +58,20 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
private final ResolvedSchema resolvedSchema;
private final ReadableConfig readableConfig;
private final Map<String, String> writeProps;
-
+ private final String dynamicRecordGeneratorImpl;
private boolean overwrite = false;
+ private boolean useDynamicSink = false;
private IcebergTableSink(IcebergTableSink toCopy) {
this.tableLoader = toCopy.tableLoader;
+ this.catalogLoader = toCopy.catalogLoader;
this.tableSchema = toCopy.tableSchema;
this.resolvedSchema = toCopy.resolvedSchema;
this.overwrite = toCopy.overwrite;
this.readableConfig = toCopy.readableConfig;
this.writeProps = toCopy.writeProps;
+ this.dynamicRecordGeneratorImpl = toCopy.dynamicRecordGeneratorImpl;
+ this.useDynamicSink = toCopy.useDynamicSink;
}
/**
@@ -68,10 +85,12 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
ReadableConfig readableConfig,
Map<String, String> writeProps) {
this.tableLoader = tableLoader;
+ this.catalogLoader = null;
this.tableSchema = tableSchema;
this.resolvedSchema = null;
this.readableConfig = readableConfig;
this.writeProps = writeProps;
+ this.dynamicRecordGeneratorImpl = null;
}
public IcebergTableSink(
@@ -80,10 +99,28 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
ReadableConfig readableConfig,
Map<String, String> writeProps) {
this.tableLoader = tableLoader;
+ this.catalogLoader = null;
this.tableSchema = null;
this.resolvedSchema = resolvedSchema;
this.readableConfig = readableConfig;
this.writeProps = writeProps;
+ this.dynamicRecordGeneratorImpl = null;
+ }
+
+ public IcebergTableSink(
+ CatalogLoader catalogLoader,
+ String dynamicRecordGeneratorImpl,
+ ResolvedSchema resolvedSchema,
+ ReadableConfig readableConfig,
+ Map<String, String> writeProps) {
+ this.tableLoader = null;
+ this.catalogLoader = catalogLoader;
+ this.dynamicRecordGeneratorImpl = dynamicRecordGeneratorImpl;
+ this.readableConfig = readableConfig;
+ this.writeProps = writeProps;
+ this.tableSchema = null;
+ this.resolvedSchema = resolvedSchema;
+ this.useDynamicSink = true;
}
@SuppressWarnings("deprecation")
@@ -93,66 +130,40 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
!overwrite || context.isBounded(),
"Unbounded data stream doesn't support overwrite operation.");
- if (resolvedSchema != null) {
- List<String> equalityColumns =
- resolvedSchema
- .getPrimaryKey()
- .map(UniqueConstraint::getColumns)
- .orElseGet(ImmutableList::of);
-
- return (DataStreamSinkProvider)
- (providerContext, dataStream) -> {
- if (Boolean.TRUE.equals(
-
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK))) {
- return IcebergSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .resolvedSchema(resolvedSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .setAll(writeProps)
- .flinkConf(readableConfig)
- .append();
- } else {
- return FlinkSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .resolvedSchema(resolvedSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .setAll(writeProps)
- .flinkConf(readableConfig)
- .append();
- }
- };
- } else {
- List<String> equalityColumns =
- tableSchema
- .getPrimaryKey()
-
.map(org.apache.flink.table.legacy.api.constraints.UniqueConstraint::getColumns)
- .orElseGet(ImmutableList::of);
-
- return (DataStreamSinkProvider)
- (providerContext, dataStream) -> {
- if
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
- return IcebergSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .setAll(writeProps)
- .flinkConf(readableConfig)
- .append();
- } else {
- return FlinkSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .setAll(writeProps)
- .flinkConf(readableConfig)
- .append();
- }
- };
- }
+ return (DataStreamSinkProvider)
+ (providerContext, dataStream) -> {
+ if (useDynamicSink) {
+ return createDynamicIcebergSink(dataStream);
+ }
+
+ ResolvedSchema physicalColumnsOnlySchema = null;
+ List<String> equalityColumns;
+ if (resolvedSchema != null) {
+ physicalColumnsOnlySchema =
+ ResolvedSchema.of(
+ resolvedSchema.getColumns().stream()
+ .filter(Column::isPhysical)
+ .collect(Collectors.toList()));
+
+ equalityColumns =
+ physicalColumnsOnlySchema
+ .getPrimaryKey()
+ .map(UniqueConstraint::getColumns)
+ .orElseGet(ImmutableList::of);
+ } else {
+ equalityColumns =
+ tableSchema
+ .getPrimaryKey()
+
.map(org.apache.flink.table.legacy.api.constraints.UniqueConstraint::getColumns)
+ .orElseGet(ImmutableList::of);
+ }
+
+ if
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
+ return createIcebergSink(dataStream, equalityColumns,
physicalColumnsOnlySchema);
+ } else {
+ return createLegacySink(dataStream, equalityColumns,
physicalColumnsOnlySchema);
+ }
+ };
}
@Override
@@ -184,4 +195,104 @@ public class IcebergTableSink implements
DynamicTableSink, SupportsPartitioning,
public void applyOverwrite(boolean newOverwrite) {
this.overwrite = newOverwrite;
}
+
+ private DataStreamSink<?> createLegacySink(
+ DataStream<RowData> dataStream,
+ List<String> equalityColumns,
+ ResolvedSchema physicalColumnsOnlySchema) {
+ FlinkSink.Builder builder =
+ FlinkSink.forRowData(dataStream)
+ .tableLoader(tableLoader)
+ .equalityFieldColumns(equalityColumns)
+ .overwrite(overwrite)
+ .setAll(writeProps)
+ .flinkConf(readableConfig);
+
+ if (physicalColumnsOnlySchema != null) {
+ builder = builder.resolvedSchema(physicalColumnsOnlySchema);
+ } else {
+ builder = builder.tableSchema(tableSchema);
+ }
+
+ return builder.append();
+ }
+
+ private DataStreamSink<?> createIcebergSink(
+ DataStream<RowData> dataStream,
+ List<String> equalityColumns,
+ ResolvedSchema physicalColumnsOnlySchema) {
+ IcebergSink.Builder builder =
+ IcebergSink.forRowData(dataStream)
+ .tableLoader(tableLoader)
+ .equalityFieldColumns(equalityColumns)
+ .overwrite(overwrite)
+ .setAll(writeProps)
+ .flinkConf(readableConfig);
+
+ if (physicalColumnsOnlySchema != null) {
+ builder = builder.resolvedSchema(physicalColumnsOnlySchema);
+ } else {
+ builder = builder.tableSchema(tableSchema);
+ }
+
+ return builder.append();
+ }
+
+ private DataStreamSink<?> createDynamicIcebergSink(DataStream<RowData>
dataStream) {
+ Preconditions.checkArgument(
+ catalogLoader != null && dynamicRecordGeneratorImpl != null,
+ "Invalid value catalogLoader: %s, DynamicRecordGenerator
Implementation class: %s. "
+ + "Both should be not null to use dynamic iceberg sink.",
+ catalogLoader,
+ dynamicRecordGeneratorImpl);
+
+ TableCreator tableCreator = createTableCreator();
+ DynamicRecordGenerator<RowData> generator =
+ createDynamicRecordGenerator(dynamicRecordGeneratorImpl);
+
+ DynamicIcebergSink.Builder<RowData> builder =
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(generator)
+ .catalogLoader(catalogLoader)
+ .setAll(writeProps)
+ .tableCreator(tableCreator)
+ .flinkConf(readableConfig);
+
+ return builder.append();
+ }
+
+ private TableCreator createTableCreator() {
+ final Map<String, String> tableProperties =
+ PropertyUtil.propertiesWithPrefix(writeProps, "table.props.");
+ final String location = writeProps.get("location");
+
+ return (catalog, identifier, schema, spec) ->
+ catalog
+ .buildTable(identifier, schema)
+ .withPartitionSpec(spec)
+ .withLocation(location)
+ .withProperties(tableProperties)
+ .create();
+ }
+
+ private DynamicTableRecordGenerator createDynamicRecordGenerator(String
generatorImpl) {
+ RowType rowType = (RowType)
resolvedSchema.toSourceRowDataType().getLogicalType();
+
+ DynConstructors.Ctor<DynamicTableRecordGenerator> ctor;
+
+ try {
+ ctor =
+ DynConstructors.builder(DynamicTableRecordGenerator.class)
+ .loader(IcebergTableSink.class.getClassLoader())
+ .impl(generatorImpl, RowType.class)
+ .buildChecked();
+ return ctor.newInstance(rowType);
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format("Class %s does not implement
DynamicRecordGeneratorSQL", generatorImpl), e);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to instantiate DynamicRecordGeneratorSQL %s",
generatorImpl), e);
+ }
+ }
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java
new file mode 100644
index 0000000000..684da9bb54
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Abstract base class for SQL-based dynamic record generators. Users will
extend this class to
+ * create a DynamicRecord from RowData.
+ */
+public abstract class DynamicTableRecordGenerator implements
DynamicRecordGenerator<RowData> {
+
+ private final RowType rowType;
+
+ public DynamicTableRecordGenerator(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ protected RowType rowType() {
+ return rowType;
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 0a6d5e44ca..309b55c115 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -25,23 +25,37 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Map;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.sink.dynamic.DynamicRecord;
+import org.apache.iceberg.flink.sink.dynamic.DynamicTableRecordGenerator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
@@ -70,16 +84,25 @@ public class TestIcebergConnector extends TestBase {
new Object[] {
"testhadoop",
ImmutableMap.of(
- "connector", "iceberg",
- "catalog-type", "hadoop"),
+ "connector",
+ "iceberg",
+ "catalog-type",
+ "hadoop",
+ "catalog-database",
+ "test_database"),
true
},
new Object[] {
"testhadoop",
ImmutableMap.of(
- "connector", "iceberg",
- "catalog-type", "hadoop",
- "catalog-table", "not_existing_table"),
+ "connector",
+ "iceberg",
+ "catalog-type",
+ "hadoop",
+ "catalog-database",
+ "test_database",
+ "catalog-table",
+ "not_existing_table"),
true
},
new Object[] {
@@ -328,4 +351,126 @@ public class TestIcebergConnector extends TestBase {
throw new UncheckedIOException(e);
}
}
+
+ @TestTemplate
+ public void testCreateDynamicIcebergSink() throws
DatabaseAlreadyExistException {
+ Map<String, String> tableProps = createTableProps();
+ Map<String, String> dynamicTableProps = Maps.newHashMap(tableProps);
+ dynamicTableProps.put("use-dynamic-iceberg-sink", "true");
+ dynamicTableProps.put(
+ "dynamic-record-generator-impl",
SimpleRowDataTableRecordGenerator.class.getName());
+ dynamicTableProps.put("table.props.key1", "val1");
+
+ FlinkCatalogFactory factory = new FlinkCatalogFactory();
+ FlinkCatalog flinkCatalog =
+ (FlinkCatalog) factory.createCatalog(catalogName, tableProps, new
Configuration());
+ flinkCatalog.createDatabase(
+ databaseName(), new CatalogDatabaseImpl(Maps.newHashMap(), null),
true);
+
+ // Create table with dynamic sink enabled
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, database_name STRING,
table_name STRING) WITH %s",
+ TABLE_NAME + "_dynamic", toWithClause(dynamicTableProps));
+
+ // Insert data with database and table information
+ sql(
+ "INSERT INTO %s VALUES (1, 'AAA', '%s', '%s'), (2, 'BBB', '%s', '%s'),
(3, 'CCC', '%s', '%s')",
+ TABLE_NAME + "_dynamic",
+ databaseName(),
+ tableName(),
+ databaseName(),
+ tableName(),
+ databaseName(),
+ tableName());
+
+ // Verify the table and data exists
+ ObjectPath objectPath = new ObjectPath(databaseName(), tableName());
+ assertThat(flinkCatalog.tableExists(objectPath)).isTrue();
+ Table table =
+ flinkCatalog
+ .getCatalogLoader()
+ .loadCatalog()
+ .loadTable(TableIdentifier.of(databaseName(), tableName()));
+ assertThat(table.properties()).containsEntry("key1", "val1");
+
+ tableProps.put("catalog-database", databaseName());
+ sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", tableName(),
toWithClause(tableProps));
+ assertThat(sql("SELECT * FROM %s", tableName()))
+ .containsExactlyInAnyOrder(Row.of(1L, "AAA"), Row.of(2L, "BBB"),
Row.of(3L, "CCC"));
+ }
+
+ @TestTemplate
+ public void testMissingDynamicRecordGeneratorImpl() throws
DatabaseAlreadyExistException {
+ Map<String, String> tableProps = createTableProps();
+ tableProps.put("use-dynamic-iceberg-sink", "true");
+
+ FlinkCatalogFactory factory = new FlinkCatalogFactory();
+ FlinkCatalog flinkCatalog =
+ (FlinkCatalog) factory.createCatalog(catalogName, tableProps, new
Configuration());
+ flinkCatalog.createDatabase(
+ databaseName(), new CatalogDatabaseImpl(Maps.newHashMap(), null),
true);
+
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, database_name STRING,
table_name STRING) WITH %s",
+ TABLE_NAME + "_dynamic", toWithClause(tableProps));
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "INSERT INTO %s VALUES (1, 'AAA', '%s', '%s')",
+ TABLE_NAME + "_dynamic", databaseName(), tableName()))
+ .cause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Invalid dynamic record generator value: null.
dynamic-record-generator-impl must be specified when use-dynamic-iceberg-sink
is true.");
+ }
+
+ public static class SimpleRowDataTableRecordGenerator extends
DynamicTableRecordGenerator {
+
+ private int databaseFieldIndex = -1;
+ private int tableFieldIndex = -1;
+
+ public SimpleRowDataTableRecordGenerator(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public void open(OpenContext openContext) throws Exception {
+ String[] fieldNames = rowType().getFieldNames().toArray(new String[0]);
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ if ("database_name".equals(fieldNames[i])) {
+ databaseFieldIndex = i;
+ } else if ("table_name".equals(fieldNames[i])) {
+ tableFieldIndex = i;
+ }
+ }
+ }
+
+ @Override
+ public void generate(RowData inputRecord, Collector<DynamicRecord> out)
throws Exception {
+ // Extract database and table names using the discovered field indexes
+ String databaseName =
inputRecord.getString(databaseFieldIndex).toString();
+ String tableName = inputRecord.getString(tableFieldIndex).toString();
+
+ // Create schema for the actual data fields (excluding metadata fields)
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.required(1, "data", Types.StringType.get()));
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(databaseName,
tableName);
+
+ DynamicRecord dynamicRecord =
+ new DynamicRecord(
+ tableIdentifier,
+ "main",
+ schema,
+ inputRecord,
+ PartitionSpec.unpartitioned(),
+ DistributionMode.NONE,
+ 1);
+ out.collect(dynamicRecord);
+ }
+ }
}