mxm commented on code in PR #15471:
URL: https://github.com/apache/iceberg/pull/15471#discussion_r2894604272


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java:
##########
@@ -28,12 +33,44 @@
 public abstract class DynamicTableRecordGenerator implements 
DynamicRecordGenerator<RowData> {
 
   private final RowType rowType;
+  private final Map<String, String> writeProps;
 
-  public DynamicTableRecordGenerator(RowType rowType) {
+  public DynamicTableRecordGenerator(RowType rowType, Map<String, String> 
writeProps) {
     this.rowType = rowType;
+    this.writeProps = writeProps;
   }
 
-  protected RowType rowType() {
+  public RowType rowType() {
     return rowType;
   }
+
+  public Map<String, String> writeProps() {
+    return writeProps;
+  }
+
+  protected Map<String, Integer> getFieldPositionIndex() {
+    Map<String, Integer> fieldNameToPosition = Maps.newHashMap();
+    List<RowType.RowField> fields = rowType.getFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+      RowType.RowField field = fields.get(i);
+      fieldNameToPosition.put(field.getName(), i);
+    }
+
+    return fieldNameToPosition;
+  }
+
+  protected void validateRequiredFieldAndType(String columnName, LogicalType 
expectedType) {

Review Comment:
   Should this method be moved to `VariantAvroDynamicTableGenerator` or a 
utility instead of the base class?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java:
##########
@@ -28,12 +33,44 @@
 public abstract class DynamicTableRecordGenerator implements 
DynamicRecordGenerator<RowData> {
 
   private final RowType rowType;
+  private final Map<String, String> writeProps;
 
-  public DynamicTableRecordGenerator(RowType rowType) {
+  public DynamicTableRecordGenerator(RowType rowType, Map<String, String> 
writeProps) {
     this.rowType = rowType;
+    this.writeProps = writeProps;
   }
 
-  protected RowType rowType() {
+  public RowType rowType() {
     return rowType;
   }
+
+  public Map<String, String> writeProps() {
+    return writeProps;
+  }
+
+  protected Map<String, Integer> getFieldPositionIndex() {
+    Map<String, Integer> fieldNameToPosition = Maps.newHashMap();
+    List<RowType.RowField> fields = rowType.getFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+      RowType.RowField field = fields.get(i);
+      fieldNameToPosition.put(field.getName(), i);
+    }
+
+    return fieldNameToPosition;
+  }
+
+  protected void validateRequiredFieldAndType(String columnName, LogicalType 
expectedType) {
+    int fieldIndex = rowType.getFieldIndex(columnName);
+
+    Preconditions.checkArgument(fieldIndex != -1, "Missing column %s", 
columnName);

Review Comment:
   Should we add the type here as well?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/VariantRowDataWrapper.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
+
+public class VariantRowDataWrapper implements RowData {
+
+  private final RowType rowType;
+  private RowKind rowKind;
+  private Variant variantData;
+
+  public VariantRowDataWrapper(RowType rowType) {
+    this(rowType, RowKind.INSERT);
+  }
+
+  public VariantRowDataWrapper(RowType rowType, RowKind rowKind) {
+    this.rowType = rowType;
+    this.rowKind = rowKind;
+  }
+
+  public VariantRowDataWrapper wrap(Variant variant) {
+    this.variantData = variant;
+    return this;
+  }
+
+  @Override
+  public int getArity() {
+    return rowType.getFieldCount();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+    return rowKind;
+  }
+
+  @Override
+  public void setRowKind(RowKind rowKind) {
+    this.rowKind = rowKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    Variant value = variantData.getField(fieldName);
+    return value == null || value.isNull();
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getBoolean();
+  }
+
+  @Override
+  public byte getByte(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getByte();
+  }
+
+  @Override
+  public short getShort(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getShort();
+  }
+
+  @Override
+  public int getInt(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return getIntValue(variantData.getField(fieldName));
+  }
+
+  @Override
+  public long getLong(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return getLongValue(variantData.getField(fieldName));
+  }
+
+  @Override
+  public float getFloat(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getFloat();
+  }
+
+  @Override
+  public double getDouble(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return getDoubleValue(variantData.getField(fieldName));
+  }
+
+  @Override
+  public StringData getString(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return isNullAt(pos)
+        ? null
+        : StringData.fromString(variantData.getField(fieldName).getString());
+  }
+
+  @Override
+  public DecimalData getDecimal(int pos, int precision, int scale) {
+    String fieldName = getFieldNameByIndex(pos);
+    return isNullAt(pos)
+        ? null
+        : DecimalData.fromBigDecimal(
+            variantData.getField(fieldName).getDecimal(), precision, scale);
+  }
+
+  @Override
+  public TimestampData getTimestamp(int pos, int precision) {
+    String fieldName = getFieldNameByIndex(pos);
+    return isNullAt(pos) ? null : 
getTimestamp(variantData.getField(fieldName));
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int pos) {
+    throw new UnsupportedOperationException("RawValue in Variant column is not 
supported.");
+  }
+
+  @Override
+  public byte[] getBinary(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getBytes();
+  }
+
+  @Override
+  public ArrayData getArray(int pos) {
+    //    ArrayType arrayType = (ArrayType) rowType.getTypeAt(pos);
+    //    LogicalType elementType = arrayType.getElementType();
+    //
+    //    String fieldName = getFieldNameByIndex(pos);
+    //    Variant arrayVariant = variantData.getField(fieldName);
+    //
+    //    if (arrayVariant != null) {
+    //        
Preconditions.checkArgument(arrayVariant.getType().equals(Variant.Type.ARRAY),
+    //                "Expected Array type, but got " + 
arrayVariant.getType());
+    //        int arraySize = arrayVariant.getArraySize();
+    //        Object[] elements = new Object[arraySize];
+    //
+    //        for (int i=0;i< arraySize;i++) {
+    //            Variant element = arrayVariant.getElement(i);
+    //            elements[i] = element == null ? null : 
getElementValue(elementType, element);
+    //        }
+    //
+    //        return new GenericArrayData(elements);
+    //    }
+    //
+    //    return null;
+    throw new UnsupportedOperationException("Array Type is not supported 
yet.");
+  }
+
+  @Override
+  public MapData getMap(int pos) {
+    //    MapType mapType = (MapType) rowType.getTypeAt(pos);
+    //    LogicalType keyType = mapType.getKeyType();
+    //    LogicalType valueType = mapType.getValueType();
+    //
+    //    Preconditions.checkArgument(keyType instanceof VarCharType,
+    //            "Map with STRING key types are only supported in Variant to 
RowData conversion");
+    //
+    //    String mapFieldName = getFieldNameByIndex(pos);
+    //    Variant mapVariant = variantData.getField(mapFieldName);
+    //
+    //    Map<Object, Object> mapData = Maps.newHashMap();
+    //    if (mapVariant != null) {
+    //        List<String> keys = mapVariant.getFieldNames();
+    //        for (String key: keys) {
+    //            mapData.put(StringData.fromString(key), 
getElementValue(valueType,
+    // mapVariant.getField(key)));
+    //        }
+    //
+    //        return new GenericMapData(mapData);
+    //    }
+    //
+    //    return null;
+    throw new UnsupportedOperationException("Map Type is not supported yet.");
+  }
+
+  @Override
+  public RowData getRow(int pos, int numFields) {
+    String fieldName = getFieldNameByIndex(pos);
+    VariantRowDataWrapper rowDataWrapper =
+        new VariantRowDataWrapper((RowType) rowType.getTypeAt(pos));
+    return rowDataWrapper.wrap(variantData.getField(fieldName));
+  }
+
+  @Override
+  public Variant getVariant(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName);
+  }
+
+  /* private Object getElementValue(LogicalType elementType, Variant variant) {
+    LogicalTypeRoot root = elementType.getTypeRoot();
+
+    switch (root) {
+      case NULL:
+        return null;
+      case BOOLEAN:
+        return variant.getBoolean();
+      case TINYINT:
+        return variant.getByte();
+      case SMALLINT:
+        return variant.getShort();
+      case INTEGER:
+        return getIntValue(variant);
+      case BIGINT:
+        return getLongValue(variant);
+      case FLOAT:
+        return variant.getFloat();
+      case DOUBLE:
+        return getDoubleValue(variant);
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) elementType;
+        return DecimalData.fromBigDecimal(
+            variant.getDecimal(), decimalType.getPrecision(), 
decimalType.getScale());
+      case CHAR:
+      case VARCHAR:
+        return StringData.fromString(variant.getString());
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return getTimestamp(variant);
+      case BINARY:
+      case VARBINARY:
+        return variant.getBytes();
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported Element type in Array/Map type:" + elementType);
+    }
+  }*/

Review Comment:
   This should be removed or fixed via adding the relevant Flink classes to 
Iceberg which are missing.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java:
##########
@@ -28,12 +33,44 @@
 public abstract class DynamicTableRecordGenerator implements 
DynamicRecordGenerator<RowData> {
 
   private final RowType rowType;
+  private final Map<String, String> writeProps;
 
-  public DynamicTableRecordGenerator(RowType rowType) {
+  public DynamicTableRecordGenerator(RowType rowType, Map<String, String> 
writeProps) {
     this.rowType = rowType;
+    this.writeProps = writeProps;
   }
 
-  protected RowType rowType() {
+  public RowType rowType() {

Review Comment:
   +1



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/VariantAvroDynamicTableRecordGenerator.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.util.Map;
+import org.apache.avro.Schema.Parser;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkCreateTableOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.data.VariantRowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+
+public class VariantAvroDynamicTableRecordGenerator extends 
DynamicTableRecordGenerator {
+  private transient Map<String, Integer> fieldNameToPosition;
+  private transient Map<TableIdentifier, SchemaAndPartitionSpecCacheItem> 
tableCache;
+  private static final Parser PARSER = new Parser();
+  private static final String DEFAULT_CACHE_MAX_SIZE = "100";
+  private static final Splitter COMMA = Splitter.on(',');
+  private transient int maxCacheSize;
+
+  public VariantAvroDynamicTableRecordGenerator(RowType rowType, Map<String, 
String> writeProps) {
+    super(rowType, writeProps);
+
+    String catalogDatabaseColumn = 
FlinkCreateTableOptions.CATALOG_DATABASE.key();
+    Preconditions.checkArgument(
+        rowType.getFieldIndex(catalogDatabaseColumn) != -1
+            || writeProps().containsKey(catalogDatabaseColumn),
+        "Invalid %s:null." + "Either %s column should be passed in Row or set 
in table options",
+        catalogDatabaseColumn,
+        catalogDatabaseColumn);
+
+    String catalogTableColumn = FlinkCreateTableOptions.CATALOG_TABLE.key();
+    Preconditions.checkArgument(
+        rowType.getFieldIndex(catalogTableColumn) != -1
+            || writeProps().containsKey(catalogTableColumn),
+        "Invalid %s:null." + "Either %s column should be passed in Row or set 
in table options",
+        catalogTableColumn,
+        catalogTableColumn);
+
+    validateRequiredFieldAndType("data", new VariantType(false));
+    validateRequiredFieldAndType("avro_schema", new VarCharType(false, 
Integer.MAX_VALUE));
+    validateRequiredFieldAndType("avro_schema_id", new VarCharType(false, 
Integer.MAX_VALUE));
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+
+    this.fieldNameToPosition = getFieldPositionIndex();
+
+    this.maxCacheSize =
+        Integer.parseInt(
+            writeProps().getOrDefault("schema-cache-max-size", 
DEFAULT_CACHE_MAX_SIZE));
+    this.tableCache = new LRUCache<>(maxCacheSize);
+  }
+
+  @Override
+  public void generate(RowData inputRecord, Collector<DynamicRecord> out) 
throws Exception {
+    String catalogDatabaseColumn = 
FlinkCreateTableOptions.CATALOG_DATABASE.key();
+    String catalogDb =
+        getStringColumnValue(
+            inputRecord, catalogDatabaseColumn, 
writeProps().get(catalogDatabaseColumn));
+
+    String catalogTableColumn = FlinkCreateTableOptions.CATALOG_TABLE.key();
+    String catalogTable =
+        getStringColumnValue(inputRecord, catalogTableColumn, 
writeProps().get(catalogTableColumn));
+
+    // All write options overrides should be inferred in DynamicIcebergSink
+    String branch = getStringColumnValue(inputRecord, 
FlinkWriteOptions.BRANCH.key());
+
+    DistributionMode distributionMode = null;
+    String distributionModeStr =
+        getStringColumnValue(inputRecord, 
FlinkWriteOptions.DISTRIBUTION_MODE.key());
+    if (distributionModeStr != null) {
+      distributionMode = DistributionMode.fromName(distributionModeStr);
+    }
+
+    int writeParallelism = -1;
+    Integer pos = 
fieldNameToPosition.get(FlinkWriteOptions.WRITE_PARALLELISM.key());
+    if (pos != null) {
+      writeParallelism = inputRecord.getInt(pos);
+    }
+
+    Variant variantData = 
inputRecord.getVariant(fieldNameToPosition.get("data"));
+    String avroSchema = getStringColumnValue(inputRecord, "avro_schema");
+    String avroSchemaId = getStringColumnValue(inputRecord, "avro_schema_id");
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(catalogDb, 
catalogTable);
+    SchemaAndPartitionSpecCacheItem cacheItem =
+        tableCache.computeIfAbsent(
+            tableIdentifier, identifier -> new 
SchemaAndPartitionSpecCacheItem(maxCacheSize));
+
+    SchemaCacheItem schemaCacheItem = 
cacheItem.getOrCreateSchema(avroSchemaId, avroSchema);
+
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    String partitionCols = getStringColumnValue(inputRecord, "partition_cols", 
null);
+    if (partitionCols != null) {
+      partitionSpec =
+          cacheItem.getOrCreatePartitionSpec(partitionCols, 
schemaCacheItem.tableSchema());
+    }
+
+    out.collect(
+        new DynamicRecord(
+            tableIdentifier,
+            branch,
+            schemaCacheItem.tableSchema(),
+            schemaCacheItem.variantRowDataWrapper().wrap(variantData),
+            partitionSpec,
+            distributionMode,
+            writeParallelism));
+  }
+
+  private String getStringColumnValue(RowData rowData, String col, String 
defaultValue) {
+    Integer pos = fieldNameToPosition.get(col);
+    if (pos != null) {
+      StringData value = rowData.getString(pos);
+      return value == null ? defaultValue : value.toString();
+    }
+
+    return defaultValue;
+  }
+
+  private String getStringColumnValue(RowData rowData, String col) {
+    return getStringColumnValue(rowData, col, null);
+  }
+
+  private static class SchemaAndPartitionSpecCacheItem {

Review Comment:
   We should add a basic test for the cache.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/VariantAvroDynamicTableRecordGenerator.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.util.Map;
+import org.apache.avro.Schema.Parser;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkCreateTableOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.data.VariantRowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+
+public class VariantAvroDynamicTableRecordGenerator extends 
DynamicTableRecordGenerator {
+  private transient Map<String, Integer> fieldNameToPosition;
+  private transient Map<TableIdentifier, SchemaAndPartitionSpecCacheItem> 
tableCache;
+  private static final Parser PARSER = new Parser();
+  private static final String DEFAULT_CACHE_MAX_SIZE = "100";
+  private static final Splitter COMMA = Splitter.on(',');
+  private transient int maxCacheSize;
+
+  public VariantAvroDynamicTableRecordGenerator(RowType rowType, Map<String, 
String> writeProps) {
+    super(rowType, writeProps);
+
+    String catalogDatabaseColumn = 
FlinkCreateTableOptions.CATALOG_DATABASE.key();
+    Preconditions.checkArgument(
+        rowType.getFieldIndex(catalogDatabaseColumn) != -1
+            || writeProps().containsKey(catalogDatabaseColumn),
+        "Invalid %s:null." + "Either %s column should be passed in Row or set 
in table options",
+        catalogDatabaseColumn,
+        catalogDatabaseColumn);
+
+    String catalogTableColumn = FlinkCreateTableOptions.CATALOG_TABLE.key();
+    Preconditions.checkArgument(
+        rowType.getFieldIndex(catalogTableColumn) != -1
+            || writeProps().containsKey(catalogTableColumn),
+        "Invalid %s:null." + "Either %s column should be passed in Row or set 
in table options",
+        catalogTableColumn,
+        catalogTableColumn);
+
+    validateRequiredFieldAndType("data", new VariantType(false));
+    validateRequiredFieldAndType("avro_schema", new VarCharType(false, 
Integer.MAX_VALUE));
+    validateRequiredFieldAndType("avro_schema_id", new VarCharType(false, 
Integer.MAX_VALUE));
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+
+    this.fieldNameToPosition = getFieldPositionIndex();
+
+    this.maxCacheSize =
+        Integer.parseInt(
+            writeProps().getOrDefault("schema-cache-max-size", 
DEFAULT_CACHE_MAX_SIZE));
+    this.tableCache = new LRUCache<>(maxCacheSize);
+  }
+
+  @Override
+  public void generate(RowData inputRecord, Collector<DynamicRecord> out) 
throws Exception {
+    String catalogDatabaseColumn = 
FlinkCreateTableOptions.CATALOG_DATABASE.key();
+    String catalogDb =
+        getStringColumnValue(
+            inputRecord, catalogDatabaseColumn, 
writeProps().get(catalogDatabaseColumn));
+
+    String catalogTableColumn = FlinkCreateTableOptions.CATALOG_TABLE.key();
+    String catalogTable =
+        getStringColumnValue(inputRecord, catalogTableColumn, 
writeProps().get(catalogTableColumn));
+
+    // All write options overrides should be inferred in DynamicIcebergSink
+    String branch = getStringColumnValue(inputRecord, 
FlinkWriteOptions.BRANCH.key());
+
+    DistributionMode distributionMode = null;
+    String distributionModeStr =
+        getStringColumnValue(inputRecord, 
FlinkWriteOptions.DISTRIBUTION_MODE.key());
+    if (distributionModeStr != null) {
+      distributionMode = DistributionMode.fromName(distributionModeStr);
+    }
+
+    int writeParallelism = -1;
+    Integer pos = 
fieldNameToPosition.get(FlinkWriteOptions.WRITE_PARALLELISM.key());
+    if (pos != null) {
+      writeParallelism = inputRecord.getInt(pos);
+    }
+
+    Variant variantData = 
inputRecord.getVariant(fieldNameToPosition.get("data"));
+    String avroSchema = getStringColumnValue(inputRecord, "avro_schema");
+    String avroSchemaId = getStringColumnValue(inputRecord, "avro_schema_id");
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(catalogDb, 
catalogTable);
+    SchemaAndPartitionSpecCacheItem cacheItem =
+        tableCache.computeIfAbsent(
+            tableIdentifier, identifier -> new 
SchemaAndPartitionSpecCacheItem(maxCacheSize));
+
+    SchemaCacheItem schemaCacheItem = 
cacheItem.getOrCreateSchema(avroSchemaId, avroSchema);
+
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    String partitionCols = getStringColumnValue(inputRecord, "partition_cols", 
null);
+    if (partitionCols != null) {
+      partitionSpec =
+          cacheItem.getOrCreatePartitionSpec(partitionCols, 
schemaCacheItem.tableSchema());
+    }
+
+    out.collect(
+        new DynamicRecord(
+            tableIdentifier,
+            branch,
+            schemaCacheItem.tableSchema(),
+            schemaCacheItem.variantRowDataWrapper().wrap(variantData),
+            partitionSpec,
+            distributionMode,
+            writeParallelism));
+  }
+
+  private String getStringColumnValue(RowData rowData, String col, String 
defaultValue) {
+    Integer pos = fieldNameToPosition.get(col);
+    if (pos != null) {
+      StringData value = rowData.getString(pos);
+      return value == null ? defaultValue : value.toString();
+    }
+
+    return defaultValue;
+  }
+
+  private String getStringColumnValue(RowData rowData, String col) {
+    return getStringColumnValue(rowData, col, null);
+  }
+
+  private static class SchemaAndPartitionSpecCacheItem {

Review Comment:
   ```suggestion
    static class SchemaAndPartitionSpecCacheItem {
   ```
   
   For testability.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/VariantAvroDynamicTableRecordGenerator.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.dynamic;
+
+import java.util.Map;
+import org.apache.avro.Schema.Parser;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkCreateTableOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.data.VariantRowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+
+public class VariantAvroDynamicTableRecordGenerator extends 
DynamicTableRecordGenerator {
+  private transient Map<String, Integer> fieldNameToPosition;
+  private transient Map<TableIdentifier, SchemaAndPartitionSpecCacheItem> 
tableCache;
+  private static final Parser PARSER = new Parser();
+  private static final String DEFAULT_CACHE_MAX_SIZE = "100";
+  private static final Splitter COMMA = Splitter.on(',');
+  private transient int maxCacheSize;
+
+  public VariantAvroDynamicTableRecordGenerator(RowType rowType, Map<String, 
String> writeProps) {
+    super(rowType, writeProps);
+
+    String catalogDatabaseColumn = 
FlinkCreateTableOptions.CATALOG_DATABASE.key();
+    Preconditions.checkArgument(
+        rowType.getFieldIndex(catalogDatabaseColumn) != -1
+            || writeProps().containsKey(catalogDatabaseColumn),
+        "Invalid %s:null." + "Either %s column should be passed in Row or set 
in table options",
+        catalogDatabaseColumn,
+        catalogDatabaseColumn);
+
+    String catalogTableColumn = FlinkCreateTableOptions.CATALOG_TABLE.key();
+    Preconditions.checkArgument(
+        rowType.getFieldIndex(catalogTableColumn) != -1
+            || writeProps().containsKey(catalogTableColumn),
+        "Invalid %s:null." + "Either %s column should be passed in Row or set 
in table options",
+        catalogTableColumn,
+        catalogTableColumn);
+
+    validateRequiredFieldAndType("data", new VariantType(false));
+    validateRequiredFieldAndType("avro_schema", new VarCharType(false, 
Integer.MAX_VALUE));
+    validateRequiredFieldAndType("avro_schema_id", new VarCharType(false, 
Integer.MAX_VALUE));
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+
+    this.fieldNameToPosition = getFieldPositionIndex();
+
+    this.maxCacheSize =
+        Integer.parseInt(
+            writeProps().getOrDefault("schema-cache-max-size", 
DEFAULT_CACHE_MAX_SIZE));
+    this.tableCache = new LRUCache<>(maxCacheSize);
+  }
+
+  @Override
+  public void generate(RowData inputRecord, Collector<DynamicRecord> out) 
throws Exception {
+    String catalogDatabaseColumn = 
FlinkCreateTableOptions.CATALOG_DATABASE.key();
+    String catalogDb =
+        getStringColumnValue(
+            inputRecord, catalogDatabaseColumn, 
writeProps().get(catalogDatabaseColumn));
+
+    String catalogTableColumn = FlinkCreateTableOptions.CATALOG_TABLE.key();
+    String catalogTable =
+        getStringColumnValue(inputRecord, catalogTableColumn, 
writeProps().get(catalogTableColumn));
+
+    // All write options overrides should be inferred in DynamicIcebergSink
+    String branch = getStringColumnValue(inputRecord, 
FlinkWriteOptions.BRANCH.key());
+
+    DistributionMode distributionMode = null;
+    String distributionModeStr =
+        getStringColumnValue(inputRecord, 
FlinkWriteOptions.DISTRIBUTION_MODE.key());
+    if (distributionModeStr != null) {
+      distributionMode = DistributionMode.fromName(distributionModeStr);
+    }
+
+    int writeParallelism = -1;
+    Integer pos = 
fieldNameToPosition.get(FlinkWriteOptions.WRITE_PARALLELISM.key());
+    if (pos != null) {
+      writeParallelism = inputRecord.getInt(pos);
+    }
+
+    Variant variantData = 
inputRecord.getVariant(fieldNameToPosition.get("data"));
+    String avroSchema = getStringColumnValue(inputRecord, "avro_schema");
+    String avroSchemaId = getStringColumnValue(inputRecord, "avro_schema_id");
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(catalogDb, 
catalogTable);
+    SchemaAndPartitionSpecCacheItem cacheItem =
+        tableCache.computeIfAbsent(
+            tableIdentifier, identifier -> new 
SchemaAndPartitionSpecCacheItem(maxCacheSize));
+
+    SchemaCacheItem schemaCacheItem = 
cacheItem.getOrCreateSchema(avroSchemaId, avroSchema);
+
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    String partitionCols = getStringColumnValue(inputRecord, "partition_cols", 
null);
+    if (partitionCols != null) {
+      partitionSpec =
+          cacheItem.getOrCreatePartitionSpec(partitionCols, 
schemaCacheItem.tableSchema());
+    }

Review Comment:
   Looks like this feature is currently untested.



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java:
##########
@@ -399,6 +400,74 @@ public void testCreateDynamicIcebergSink() throws 
DatabaseAlreadyExistException
         .containsExactlyInAnyOrder(Row.of(1L, "AAA"), Row.of(2L, "BBB"), 
Row.of(3L, "CCC"));
   }
 
+  @TestTemplate
+  public void testVariantAvroDynamicIcebergSink() throws 
DatabaseAlreadyExistException {
+    Map<String, String> tableProps = createTableProps();
+    Map<String, String> dynamicTableProps = Maps.newHashMap(tableProps);
+    dynamicTableProps.put("use-dynamic-iceberg-sink", "true");
+    dynamicTableProps.put(
+        "dynamic-record-generator-impl", 
VariantAvroDynamicTableRecordGenerator.class.getName());
+
+    FlinkCatalogFactory factory = new FlinkCatalogFactory();
+    FlinkCatalog flinkCatalog =
+        (FlinkCatalog) factory.createCatalog(catalogName, tableProps, new 
Configuration());
+    flinkCatalog.createDatabase(
+        databaseName(), new CatalogDatabaseImpl(Maps.newHashMap(), null), 
true);
+
+    String avroSchema =
+        """
+      {
+        "type": "record",
+        "name": "TestSchema",
+        "fields": [
+          {
+            "name": "id",
+            "type": "long"
+          },
+          {
+            "name": "name",
+            "type": "string"
+          }
+        ]
+      }
+      """;
+
+    String avroSchemaId = "TestSchema:1";
+    // Create table with dynamic sink enabled
+    sql(
+        "CREATE TABLE %s (data VARIANT, `catalog-database` STRING, 
`catalog-table` STRING, avro_schema STRING, avro_schema_id STRING, branch 
STRING, `write-parallelism` INT) WITH %s",
+        TABLE_NAME + "_dynamic", toWithClause(dynamicTableProps));
+
+    // Insert data with database and table information
+    sql(
+        "INSERT INTO %s VALUES "
+            + "(PARSE_JSON('{\"id\": 1, \"name\": \"AAA\"}'), '%s', '%s', 
'%s', '%s', 'main', 1), "
+            + "(PARSE_JSON('{\"id\": 2, \"name\": \"BBB\"}'), '%s', '%s', 
'%s', '%s', 'main', 1)",
+        TABLE_NAME + "_dynamic",
+        databaseName(),
+        tableName(),
+        avroSchema,
+        avroSchemaId,
+        databaseName(),
+        tableName(),
+        avroSchema,
+        avroSchemaId);
+
+    // Verify the table and data exists
+    ObjectPath objectPath = new ObjectPath(databaseName(), tableName());
+    assertThat(flinkCatalog.tableExists(objectPath)).isTrue();
+    Table table =
+        flinkCatalog
+            .getCatalogLoader()
+            .loadCatalog()
+            .loadTable(TableIdentifier.of(databaseName(), tableName()));
+
+    tableProps.put("catalog-database", databaseName());
+    sql("CREATE TABLE %s (id BIGINT, name STRING) WITH %s", tableName(), 
toWithClause(tableProps));
+    assertThat(sql("SELECT * FROM %s", tableName()))
+        .containsExactlyInAnyOrder(Row.of(1L, "AAA"), Row.of(2L, "BBB"));

Review Comment:
   Do we need to load the table in line 459? It looks like we already do that 
in line 466-468 where we also verify the contents.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/VariantRowDataWrapper.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
+
+public class VariantRowDataWrapper implements RowData {
+
+  private final RowType rowType;
+  private RowKind rowKind;
+  private Variant variantData;
+
+  public VariantRowDataWrapper(RowType rowType) {
+    this(rowType, RowKind.INSERT);
+  }
+
+  public VariantRowDataWrapper(RowType rowType, RowKind rowKind) {
+    this.rowType = rowType;
+    this.rowKind = rowKind;
+  }
+
+  public VariantRowDataWrapper wrap(Variant variant) {
+    this.variantData = variant;
+    return this;
+  }
+
+  @Override
+  public int getArity() {
+    return rowType.getFieldCount();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+    return rowKind;
+  }
+
+  @Override
+  public void setRowKind(RowKind rowKind) {
+    this.rowKind = rowKind;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    Variant value = variantData.getField(fieldName);
+    return value == null || value.isNull();
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getBoolean();
+  }
+
+  @Override
+  public byte getByte(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getByte();
+  }
+
+  @Override
+  public short getShort(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getShort();
+  }
+
+  @Override
+  public int getInt(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return getIntValue(variantData.getField(fieldName));
+  }
+
+  @Override
+  public long getLong(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return getLongValue(variantData.getField(fieldName));
+  }
+
+  @Override
+  public float getFloat(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getFloat();
+  }
+
+  @Override
+  public double getDouble(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return getDoubleValue(variantData.getField(fieldName));
+  }
+
+  @Override
+  public StringData getString(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return isNullAt(pos)
+        ? null
+        : StringData.fromString(variantData.getField(fieldName).getString());
+  }
+
+  @Override
+  public DecimalData getDecimal(int pos, int precision, int scale) {
+    String fieldName = getFieldNameByIndex(pos);
+    return isNullAt(pos)
+        ? null
+        : DecimalData.fromBigDecimal(
+            variantData.getField(fieldName).getDecimal(), precision, scale);
+  }
+
+  @Override
+  public TimestampData getTimestamp(int pos, int precision) {
+    String fieldName = getFieldNameByIndex(pos);
+    return isNullAt(pos) ? null : 
getTimestamp(variantData.getField(fieldName));
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int pos) {
+    throw new UnsupportedOperationException("RawValue in Variant column is not 
supported.");
+  }
+
+  @Override
+  public byte[] getBinary(int pos) {
+    String fieldName = getFieldNameByIndex(pos);
+    return variantData.getField(fieldName).getBytes();
+  }
+
+  @Override
+  public ArrayData getArray(int pos) {
+    //    ArrayType arrayType = (ArrayType) rowType.getTypeAt(pos);
+    //    LogicalType elementType = arrayType.getElementType();
+    //
+    //    String fieldName = getFieldNameByIndex(pos);
+    //    Variant arrayVariant = variantData.getField(fieldName);
+    //
+    //    if (arrayVariant != null) {
+    //        
Preconditions.checkArgument(arrayVariant.getType().equals(Variant.Type.ARRAY),
+    //                "Expected Array type, but got " + 
arrayVariant.getType());
+    //        int arraySize = arrayVariant.getArraySize();
+    //        Object[] elements = new Object[arraySize];
+    //
+    //        for (int i=0;i< arraySize;i++) {
+    //            Variant element = arrayVariant.getElement(i);
+    //            elements[i] = element == null ? null : 
getElementValue(elementType, element);
+    //        }
+    //
+    //        return new GenericArrayData(elements);
+    //    }
+    //
+    //    return null;

Review Comment:
   Is this due to the missing Flink accessors? If so, let's add the relevant 
Flink source code in this PR. We will anyway have to wait on the Flink 
release(s) which will take time.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java:
##########
@@ -28,12 +33,44 @@
 public abstract class DynamicTableRecordGenerator implements 
DynamicRecordGenerator<RowData> {
 
   private final RowType rowType;
+  private final Map<String, String> writeProps;
 
-  public DynamicTableRecordGenerator(RowType rowType) {
+  public DynamicTableRecordGenerator(RowType rowType, Map<String, String> 
writeProps) {
     this.rowType = rowType;
+    this.writeProps = writeProps;
   }
 
-  protected RowType rowType() {
+  public RowType rowType() {
     return rowType;
   }
+
+  public Map<String, String> writeProps() {
+    return writeProps;
+  }
+
+  protected Map<String, Integer> getFieldPositionIndex() {

Review Comment:
   Rename to `positionByFieldName()`? Should this method be moved to 
`VariantAvroDynamicTableGenerator` or a utility instead of the base class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to