This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 86defcebee Flink: SQL support for dynamic iceberg sink (#15279)
86defcebee is described below

commit 86defcebeeff70c0621adae86d2cb9252ab6f31f
Author: Swapna Marru <[email protected]>
AuthorDate: Mon Feb 23 04:53:58 2026 -0800

    Flink: SQL support for dynamic iceberg sink (#15279)
---
 .../iceberg/flink/FlinkCreateTableOptions.java     |  18 ++
 .../iceberg/flink/FlinkDynamicTableFactory.java    |  99 ++++++---
 .../org/apache/iceberg/flink/IcebergTableSink.java | 233 +++++++++++++++------
 .../sink/dynamic/DynamicTableRecordGenerator.java  |  39 ++++
 .../apache/iceberg/flink/TestIcebergConnector.java | 155 +++++++++++++-
 5 files changed, 450 insertions(+), 94 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
index ab69ec5adc..0612260bfe 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java
@@ -67,6 +67,24 @@ class FlinkCreateTableOptions {
           .noDefaultValue()
           .withDescription("Properties for the underlying catalog for iceberg 
table.");
 
+  public static final ConfigOption<Boolean> USE_DYNAMIC_ICEBERG_SINK =
+      ConfigOptions.key("use-dynamic-iceberg-sink")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "Whether to use dynamic iceberg sink for routing data to 
multiple tables. "
+                  + "When enabled, a single sink instance can dynamically 
route records to different "
+                  + "Iceberg tables based on the logic defined in the 
DynamicRecordGenerator implementation. "
+                  + "Requires 'dynamic-record-generator-impl' to be specified. 
"
+                  + "Default is false (uses standard static sink behavior).");
+
+  public static final ConfigOption<String> DYNAMIC_RECORD_GENERATOR_IMPL =
+      ConfigOptions.key("dynamic-record-generator-impl")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Implementation of DynamicTableRecordGenerator class when 
use-dynamic-iceberg-sink is enabled.");
+
   public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";
   public static final String CONNECTOR_PROPS_KEY = "connector";
   public static final String LOCATION_KEY = "location";
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
index bd79c11560..b2e2e33b92 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
@@ -88,26 +88,33 @@ public class FlinkDynamicTableFactory
     ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
     ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
     Map<String, String> writeProps = resolvedCatalogTable.getOptions();
-    ResolvedSchema resolvedSchema =
-        ResolvedSchema.of(
-            resolvedCatalogTable.getResolvedSchema().getColumns().stream()
-                .filter(Column::isPhysical)
-                .collect(Collectors.toList()));
 
-    TableLoader tableLoader;
-    if (catalog != null) {
-      tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+    Configuration flinkConf = new Configuration();
+    writeProps.forEach(flinkConf::setString);
+
+    boolean useDynamicSink = 
flinkConf.get(FlinkCreateTableOptions.USE_DYNAMIC_ICEBERG_SINK);
+
+    if (useDynamicSink) {
+      return getIcebergTableSinkWithDynamicSinkProps(context, flinkConf, 
writeProps);
     } else {
-      tableLoader =
-          createTableLoader(
-              resolvedCatalogTable,
-              writeProps,
-              objectIdentifier.getDatabaseName(),
-              objectIdentifier.getObjectName());
-    }
+      TableLoader tableLoader;
+      if (catalog != null) {
+        tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+      } else {
+        tableLoader =
+            createTableLoader(
+                resolvedCatalogTable,
+                writeProps,
+                objectIdentifier.getDatabaseName(),
+                objectIdentifier.getObjectName());
+      }
 
-    return new IcebergTableSink(
-        tableLoader, resolvedSchema, context.getConfiguration(), writeProps);
+      return new IcebergTableSink(
+          tableLoader,
+          resolvedCatalogTable.getResolvedSchema(),
+          context.getConfiguration(),
+          writeProps);
+    }
   }
 
   @Override
@@ -123,6 +130,8 @@ public class FlinkDynamicTableFactory
     Set<ConfigOption<?>> options = Sets.newHashSet();
     options.add(FlinkCreateTableOptions.CATALOG_DATABASE);
     options.add(FlinkCreateTableOptions.CATALOG_TABLE);
+    options.add(FlinkCreateTableOptions.USE_DYNAMIC_ICEBERG_SINK);
+    options.add(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
     return options;
   }
 
@@ -131,6 +140,46 @@ public class FlinkDynamicTableFactory
     return FACTORY_IDENTIFIER;
   }
 
+  private IcebergTableSink getIcebergTableSinkWithDynamicSinkProps(
+      Context context, Configuration flinkConf, Map<String, String> 
writeProps) {
+    String dynamicRecordGeneratorImpl =
+        flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
+    Preconditions.checkArgument(
+        dynamicRecordGeneratorImpl != null,
+        "Invalid dynamic record generator value: null. %s  must be specified 
when use-dynamic-iceberg-sink is true.",
+        FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());
+
+    CatalogLoader catalogLoader;
+    if (catalog != null) {
+      catalogLoader = catalog.getCatalogLoader();
+    } else {
+      FlinkCatalog flinkCatalog =
+          createCatalogLoader(writeProps, 
flinkConf.get(FlinkCreateTableOptions.CATALOG_NAME));
+      catalogLoader = flinkCatalog.getCatalogLoader();
+    }
+
+    ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
+
+    return new IcebergTableSink(
+        catalogLoader,
+        dynamicRecordGeneratorImpl,
+        resolvedCatalogTable.getResolvedSchema(),
+        context.getConfiguration(),
+        writeProps);
+  }
+
+  private static FlinkCatalog createCatalogLoader(
+      Map<String, String> tableProps, String catalogName) {
+    Preconditions.checkArgument(
+        catalogName != null,
+        "Invalid catalog name: null. Set %s create table option.",
+        FlinkCreateTableOptions.CATALOG_NAME);
+
+    FlinkCatalogFactory factory = new FlinkCatalogFactory();
+    return (FlinkCatalog)
+        factory.createCatalog(catalogName, tableProps, 
FlinkCatalogFactory.clusterHadoopConf());
+  }
+
   private static TableLoader createTableLoader(
       ResolvedCatalogTable resolvedCatalogTable,
       Map<String, String> tableProps,
@@ -143,21 +192,16 @@ public class FlinkDynamicTableFactory
     mergedProps.forEach(flinkConf::setString);
 
     String catalogName = flinkConf.get(FlinkCreateTableOptions.CATALOG_NAME);
-    Preconditions.checkNotNull(
-        catalogName,
-        "Table property '%s' cannot be null",
-        FlinkCreateTableOptions.CATALOG_NAME.key());
 
     String catalogDatabase = 
flinkConf.get(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
-    Preconditions.checkNotNull(catalogDatabase, "The iceberg database name 
cannot be null");
+    Preconditions.checkArgument(
+        catalogDatabase != null,
+        "Invalid database name: null. Set %s create table option or specify 
fully qualified table name.",
+        FlinkCreateTableOptions.CATALOG_DATABASE);
 
     String catalogTable = flinkConf.get(FlinkCreateTableOptions.CATALOG_TABLE, 
tableName);
-    Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be 
null");
 
-    org.apache.hadoop.conf.Configuration hadoopConf = 
FlinkCatalogFactory.clusterHadoopConf();
-    FlinkCatalogFactory factory = new FlinkCatalogFactory();
-    FlinkCatalog flinkCatalog =
-        (FlinkCatalog) factory.createCatalog(catalogName, mergedProps, 
hadoopConf);
+    FlinkCatalog flinkCatalog = createCatalogLoader(mergedProps, catalogName);
     ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
 
     // Create database if not exists in the external catalog.
@@ -229,7 +273,6 @@ public class FlinkDynamicTableFactory
   }
 
   private static TableLoader createTableLoader(FlinkCatalog catalog, 
ObjectPath objectPath) {
-    Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
     return TableLoader.fromCatalog(catalog.getCatalogLoader(), 
catalog.toIdentifier(objectPath));
   }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
index fa2fb89221..3fc3d093f6 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
@@ -20,7 +20,11 @@ package org.apache.iceberg.flink;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -28,15 +32,24 @@ import 
org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.flink.sink.FlinkSink;
 import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.flink.sink.dynamic.DynamicIcebergSink;
+import org.apache.iceberg.flink.sink.dynamic.DynamicRecordGenerator;
+import org.apache.iceberg.flink.sink.dynamic.DynamicTableRecordGenerator;
+import org.apache.iceberg.flink.sink.dynamic.TableCreator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.PropertyUtil;
 
 public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
+  private final CatalogLoader catalogLoader;
 
   @SuppressWarnings("deprecation")
   @Deprecated
@@ -45,16 +58,20 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
   private final ResolvedSchema resolvedSchema;
   private final ReadableConfig readableConfig;
   private final Map<String, String> writeProps;
-
+  private final String dynamicRecordGeneratorImpl;
   private boolean overwrite = false;
+  private boolean useDynamicSink = false;
 
   private IcebergTableSink(IcebergTableSink toCopy) {
     this.tableLoader = toCopy.tableLoader;
+    this.catalogLoader = toCopy.catalogLoader;
     this.tableSchema = toCopy.tableSchema;
     this.resolvedSchema = toCopy.resolvedSchema;
     this.overwrite = toCopy.overwrite;
     this.readableConfig = toCopy.readableConfig;
     this.writeProps = toCopy.writeProps;
+    this.dynamicRecordGeneratorImpl = toCopy.dynamicRecordGeneratorImpl;
+    this.useDynamicSink = toCopy.useDynamicSink;
   }
 
   /**
@@ -68,10 +85,12 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
       ReadableConfig readableConfig,
       Map<String, String> writeProps) {
     this.tableLoader = tableLoader;
+    this.catalogLoader = null;
     this.tableSchema = tableSchema;
     this.resolvedSchema = null;
     this.readableConfig = readableConfig;
     this.writeProps = writeProps;
+    this.dynamicRecordGeneratorImpl = null;
   }
 
   public IcebergTableSink(
@@ -80,10 +99,28 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
       ReadableConfig readableConfig,
       Map<String, String> writeProps) {
     this.tableLoader = tableLoader;
+    this.catalogLoader = null;
     this.tableSchema = null;
     this.resolvedSchema = resolvedSchema;
     this.readableConfig = readableConfig;
     this.writeProps = writeProps;
+    this.dynamicRecordGeneratorImpl = null;
+  }
+
+  public IcebergTableSink(
+      CatalogLoader catalogLoader,
+      String dynamicRecordGeneratorImpl,
+      ResolvedSchema resolvedSchema,
+      ReadableConfig readableConfig,
+      Map<String, String> writeProps) {
+    this.tableLoader = null;
+    this.catalogLoader = catalogLoader;
+    this.dynamicRecordGeneratorImpl = dynamicRecordGeneratorImpl;
+    this.readableConfig = readableConfig;
+    this.writeProps = writeProps;
+    this.tableSchema = null;
+    this.resolvedSchema = resolvedSchema;
+    this.useDynamicSink = true;
   }
 
   @SuppressWarnings("deprecation")
@@ -93,66 +130,40 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
         !overwrite || context.isBounded(),
         "Unbounded data stream doesn't support overwrite operation.");
 
-    if (resolvedSchema != null) {
-      List<String> equalityColumns =
-          resolvedSchema
-              .getPrimaryKey()
-              .map(UniqueConstraint::getColumns)
-              .orElseGet(ImmutableList::of);
-
-      return (DataStreamSinkProvider)
-          (providerContext, dataStream) -> {
-            if (Boolean.TRUE.equals(
-                
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK))) {
-              return IcebergSink.forRowData(dataStream)
-                  .tableLoader(tableLoader)
-                  .resolvedSchema(resolvedSchema)
-                  .equalityFieldColumns(equalityColumns)
-                  .overwrite(overwrite)
-                  .setAll(writeProps)
-                  .flinkConf(readableConfig)
-                  .append();
-            } else {
-              return FlinkSink.forRowData(dataStream)
-                  .tableLoader(tableLoader)
-                  .resolvedSchema(resolvedSchema)
-                  .equalityFieldColumns(equalityColumns)
-                  .overwrite(overwrite)
-                  .setAll(writeProps)
-                  .flinkConf(readableConfig)
-                  .append();
-            }
-          };
-    } else {
-      List<String> equalityColumns =
-          tableSchema
-              .getPrimaryKey()
-              
.map(org.apache.flink.table.legacy.api.constraints.UniqueConstraint::getColumns)
-              .orElseGet(ImmutableList::of);
-
-      return (DataStreamSinkProvider)
-          (providerContext, dataStream) -> {
-            if 
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
-              return IcebergSink.forRowData(dataStream)
-                  .tableLoader(tableLoader)
-                  .tableSchema(tableSchema)
-                  .equalityFieldColumns(equalityColumns)
-                  .overwrite(overwrite)
-                  .setAll(writeProps)
-                  .flinkConf(readableConfig)
-                  .append();
-            } else {
-              return FlinkSink.forRowData(dataStream)
-                  .tableLoader(tableLoader)
-                  .tableSchema(tableSchema)
-                  .equalityFieldColumns(equalityColumns)
-                  .overwrite(overwrite)
-                  .setAll(writeProps)
-                  .flinkConf(readableConfig)
-                  .append();
-            }
-          };
-    }
+    return (DataStreamSinkProvider)
+        (providerContext, dataStream) -> {
+          if (useDynamicSink) {
+            return createDynamicIcebergSink(dataStream);
+          }
+
+          ResolvedSchema physicalColumnsOnlySchema = null;
+          List<String> equalityColumns;
+          if (resolvedSchema != null) {
+            physicalColumnsOnlySchema =
+                ResolvedSchema.of(
+                    resolvedSchema.getColumns().stream()
+                        .filter(Column::isPhysical)
+                        .collect(Collectors.toList()));
+
+            equalityColumns =
+                physicalColumnsOnlySchema
+                    .getPrimaryKey()
+                    .map(UniqueConstraint::getColumns)
+                    .orElseGet(ImmutableList::of);
+          } else {
+            equalityColumns =
+                tableSchema
+                    .getPrimaryKey()
+                    
.map(org.apache.flink.table.legacy.api.constraints.UniqueConstraint::getColumns)
+                    .orElseGet(ImmutableList::of);
+          }
+
+          if 
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
+            return createIcebergSink(dataStream, equalityColumns, 
physicalColumnsOnlySchema);
+          } else {
+            return createLegacySink(dataStream, equalityColumns, 
physicalColumnsOnlySchema);
+          }
+        };
   }
 
   @Override
@@ -184,4 +195,104 @@ public class IcebergTableSink implements 
DynamicTableSink, SupportsPartitioning,
   public void applyOverwrite(boolean newOverwrite) {
     this.overwrite = newOverwrite;
   }
+
+  private DataStreamSink<?> createLegacySink(
+      DataStream<RowData> dataStream,
+      List<String> equalityColumns,
+      ResolvedSchema physicalColumnsOnlySchema) {
+    FlinkSink.Builder builder =
+        FlinkSink.forRowData(dataStream)
+            .tableLoader(tableLoader)
+            .equalityFieldColumns(equalityColumns)
+            .overwrite(overwrite)
+            .setAll(writeProps)
+            .flinkConf(readableConfig);
+
+    if (physicalColumnsOnlySchema != null) {
+      builder = builder.resolvedSchema(physicalColumnsOnlySchema);
+    } else {
+      builder = builder.tableSchema(tableSchema);
+    }
+
+    return builder.append();
+  }
+
+  private DataStreamSink<?> createIcebergSink(
+      DataStream<RowData> dataStream,
+      List<String> equalityColumns,
+      ResolvedSchema physicalColumnsOnlySchema) {
+    IcebergSink.Builder builder =
+        IcebergSink.forRowData(dataStream)
+            .tableLoader(tableLoader)
+            .equalityFieldColumns(equalityColumns)
+            .overwrite(overwrite)
+            .setAll(writeProps)
+            .flinkConf(readableConfig);
+
+    if (physicalColumnsOnlySchema != null) {
+      builder = builder.resolvedSchema(physicalColumnsOnlySchema);
+    } else {
+      builder = builder.tableSchema(tableSchema);
+    }
+
+    return builder.append();
+  }
+
+  private DataStreamSink<?> createDynamicIcebergSink(DataStream<RowData> 
dataStream) {
+    Preconditions.checkArgument(
+        catalogLoader != null && dynamicRecordGeneratorImpl != null,
+        "Invalid value catalogLoader: %s, DynamicRecordGenerator 
Implementation class: %s. "
+            + "Both should be not null to use dynamic iceberg sink.",
+        catalogLoader,
+        dynamicRecordGeneratorImpl);
+
+    TableCreator tableCreator = createTableCreator();
+    DynamicRecordGenerator<RowData> generator =
+        createDynamicRecordGenerator(dynamicRecordGeneratorImpl);
+
+    DynamicIcebergSink.Builder<RowData> builder =
+        DynamicIcebergSink.forInput(dataStream)
+            .generator(generator)
+            .catalogLoader(catalogLoader)
+            .setAll(writeProps)
+            .tableCreator(tableCreator)
+            .flinkConf(readableConfig);
+
+    return builder.append();
+  }
+
+  private TableCreator createTableCreator() {
+    final Map<String, String> tableProperties =
+        PropertyUtil.propertiesWithPrefix(writeProps, "table.props.");
+    final String location = writeProps.get("location");
+
+    return (catalog, identifier, schema, spec) ->
+        catalog
+            .buildTable(identifier, schema)
+            .withPartitionSpec(spec)
+            .withLocation(location)
+            .withProperties(tableProperties)
+            .create();
+  }
+
+  private DynamicTableRecordGenerator createDynamicRecordGenerator(String 
generatorImpl) {
+    RowType rowType = (RowType) 
resolvedSchema.toSourceRowDataType().getLogicalType();
+
+    DynConstructors.Ctor<DynamicTableRecordGenerator> ctor;
+
+    try {
+      ctor =
+          DynConstructors.builder(DynamicTableRecordGenerator.class)
+              .loader(IcebergTableSink.class.getClassLoader())
+              .impl(generatorImpl, RowType.class)
+              .buildChecked();
+      return ctor.newInstance(rowType);
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format("Class %s does not implement 
DynamicRecordGeneratorSQL", generatorImpl), e);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to instantiate DynamicRecordGeneratorSQL %s", 
generatorImpl), e);
+    }
+  }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java
new file mode 100644
index 0000000000..684da9bb54
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Abstract base class for SQL-based dynamic record generators. Users will 
extend this class to
+ * create a DynamicRecord from RowData.
+ */
+public abstract class DynamicTableRecordGenerator implements 
DynamicRecordGenerator<RowData> {
+
+  private final RowType rowType;
+
+  public DynamicTableRecordGenerator(RowType rowType) {
+    this.rowType = rowType;
+  }
+
+  protected RowType rowType() {
+    return rowType;
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 0a6d5e44ca..309b55c115 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -25,23 +25,37 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.file.Files;
 import java.util.Map;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.sink.dynamic.DynamicRecord;
+import org.apache.iceberg.flink.sink.dynamic.DynamicTableRecordGenerator;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
 import org.apache.thrift.TException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -70,16 +84,25 @@ public class TestIcebergConnector extends TestBase {
         new Object[] {
           "testhadoop",
           ImmutableMap.of(
-              "connector", "iceberg",
-              "catalog-type", "hadoop"),
+              "connector",
+              "iceberg",
+              "catalog-type",
+              "hadoop",
+              "catalog-database",
+              "test_database"),
           true
         },
         new Object[] {
           "testhadoop",
           ImmutableMap.of(
-              "connector", "iceberg",
-              "catalog-type", "hadoop",
-              "catalog-table", "not_existing_table"),
+              "connector",
+              "iceberg",
+              "catalog-type",
+              "hadoop",
+              "catalog-database",
+              "test_database",
+              "catalog-table",
+              "not_existing_table"),
           true
         },
         new Object[] {
@@ -328,4 +351,126 @@ public class TestIcebergConnector extends TestBase {
       throw new UncheckedIOException(e);
     }
   }
+
+  @TestTemplate
+  public void testCreateDynamicIcebergSink() throws 
DatabaseAlreadyExistException {
+    Map<String, String> tableProps = createTableProps();
+    Map<String, String> dynamicTableProps = Maps.newHashMap(tableProps);
+    dynamicTableProps.put("use-dynamic-iceberg-sink", "true");
+    dynamicTableProps.put(
+        "dynamic-record-generator-impl", 
SimpleRowDataTableRecordGenerator.class.getName());
+    dynamicTableProps.put("table.props.key1", "val1");
+
+    FlinkCatalogFactory factory = new FlinkCatalogFactory();
+    FlinkCatalog flinkCatalog =
+        (FlinkCatalog) factory.createCatalog(catalogName, tableProps, new 
Configuration());
+    flinkCatalog.createDatabase(
+        databaseName(), new CatalogDatabaseImpl(Maps.newHashMap(), null), 
true);
+
+    // Create table with dynamic sink enabled
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, database_name STRING, 
table_name STRING) WITH %s",
+        TABLE_NAME + "_dynamic", toWithClause(dynamicTableProps));
+
+    // Insert data with database and table information
+    sql(
+        "INSERT INTO %s VALUES (1, 'AAA', '%s', '%s'), (2, 'BBB', '%s', '%s'), 
(3, 'CCC', '%s', '%s')",
+        TABLE_NAME + "_dynamic",
+        databaseName(),
+        tableName(),
+        databaseName(),
+        tableName(),
+        databaseName(),
+        tableName());
+
+    // Verify the table and data exists
+    ObjectPath objectPath = new ObjectPath(databaseName(), tableName());
+    assertThat(flinkCatalog.tableExists(objectPath)).isTrue();
+    Table table =
+        flinkCatalog
+            .getCatalogLoader()
+            .loadCatalog()
+            .loadTable(TableIdentifier.of(databaseName(), tableName()));
+    assertThat(table.properties()).containsEntry("key1", "val1");
+
+    tableProps.put("catalog-database", databaseName());
+    sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", tableName(), 
toWithClause(tableProps));
+    assertThat(sql("SELECT * FROM %s", tableName()))
+        .containsExactlyInAnyOrder(Row.of(1L, "AAA"), Row.of(2L, "BBB"), 
Row.of(3L, "CCC"));
+  }
+
+  @TestTemplate
+  public void testMissingDynamicRecordGeneratorImpl() throws 
DatabaseAlreadyExistException {
+    Map<String, String> tableProps = createTableProps();
+    tableProps.put("use-dynamic-iceberg-sink", "true");
+
+    FlinkCatalogFactory factory = new FlinkCatalogFactory();
+    FlinkCatalog flinkCatalog =
+        (FlinkCatalog) factory.createCatalog(catalogName, tableProps, new 
Configuration());
+    flinkCatalog.createDatabase(
+        databaseName(), new CatalogDatabaseImpl(Maps.newHashMap(), null), 
true);
+
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, database_name STRING, 
table_name STRING) WITH %s",
+        TABLE_NAME + "_dynamic", toWithClause(tableProps));
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "INSERT INTO %s VALUES (1, 'AAA', '%s', '%s')",
+                    TABLE_NAME + "_dynamic", databaseName(), tableName()))
+        .cause()
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Invalid dynamic record generator value: null. 
dynamic-record-generator-impl  must be specified when use-dynamic-iceberg-sink 
is true.");
+  }
+
+  public static class SimpleRowDataTableRecordGenerator extends 
DynamicTableRecordGenerator {
+
+    private int databaseFieldIndex = -1;
+    private int tableFieldIndex = -1;
+
+    public SimpleRowDataTableRecordGenerator(RowType rowType) {
+      super(rowType);
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+      String[] fieldNames = rowType().getFieldNames().toArray(new String[0]);
+
+      for (int i = 0; i < fieldNames.length; i++) {
+        if ("database_name".equals(fieldNames[i])) {
+          databaseFieldIndex = i;
+        } else if ("table_name".equals(fieldNames[i])) {
+          tableFieldIndex = i;
+        }
+      }
+    }
+
+    @Override
+    public void generate(RowData inputRecord, Collector<DynamicRecord> out) 
throws Exception {
+      // Extract database and table names using the discovered field indexes
+      String databaseName = 
inputRecord.getString(databaseFieldIndex).toString();
+      String tableName = inputRecord.getString(tableFieldIndex).toString();
+
+      // Create schema for the actual data fields (excluding metadata fields)
+      Schema schema =
+          new Schema(
+              Types.NestedField.required(0, "id", Types.LongType.get()),
+              Types.NestedField.required(1, "data", Types.StringType.get()));
+
+      TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, 
tableName);
+
+      DynamicRecord dynamicRecord =
+          new DynamicRecord(
+              tableIdentifier,
+              "main",
+              schema,
+              inputRecord,
+              PartitionSpec.unpartitioned(),
+              DistributionMode.NONE,
+              1);
+      out.collect(dynamicRecord);
+    }
+  }
 }

Reply via email to