swapna267 commented on code in PR #16450:
URL: https://github.com/apache/iceberg/pull/16450#discussion_r3409948183
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java:
##########
@@ -28,12 +34,61 @@
public abstract class DynamicTableRecordGenerator implements
DynamicRecordGenerator<RowData> {
private final RowType rowType;
+ private final Configuration flinkConfiguration;
+ private final Map<String, String> writeProperties;
+ private final Map<String, Integer> fieldNameToPosition;
- public DynamicTableRecordGenerator(RowType rowType) {
+ public DynamicTableRecordGenerator(
+ RowType rowType, Map<String, String> writeProperties, Configuration
flinkConfiguration) {
this.rowType = rowType;
+ this.writeProperties = writeProperties;
+ this.fieldNameToPosition = fieldNameToPositionMapping();
+ this.flinkConfiguration = flinkConfiguration;
}
protected RowType rowType() {
return rowType;
}
+
+ protected Map<String, String> writeProperties() {
+ return writeProperties;
+ }
+
+ protected Configuration flinkConfiguration() {
+ return flinkConfiguration;
+ }
Review Comment:
We can remove `flinkConfiguration()` and replace with `DynamicSinkConf`.
But we are still using Write properties as fallback to determine Catalog
database or Catalog table overrides. We don't fall back to FlinkConfiguration
for them similar to FlinkDynamicTableFactory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]