aiborodin commented on code in PR #13340:
URL: https://github.com/apache/iceberg/pull/13340#discussion_r2156515911


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java:
##########
@@ -31,10 +33,14 @@
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.dynamic.convert.RowDataConverter;
 
 @Internal
 class DynamicRecordProcessor<T> extends ProcessFunction<T, 
DynamicRecordInternal>
     implements Collector<DynamicRecord> {
+  private static final int ROW_DATA_CONVERTER_CACHE_MAXIMUM_SIZE = 1000;

Review Comment:
   I added a configuration option for this cache. However, it feels like we 
should do it differently. I made a change locally, which I can raise as a PR, 
to merge the `RowDataConverter` cache into the `SchemaInfo` in the 
`TableMetadataCache`. This design means each converter is stored with its 
source schema and a comparison result from the schema visitor. We can then make 
the static constant `TableMetadataCache.MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE` 
configurable to adjust the converter cache size instead of the option I added.
   The only problem with the second approach is that the cache will recompute 
converters every `refreshMs` for all operators (1 second by default). This 
would probably have a small performance overhead, but I need to confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to