aiborodin commented on code in PR #13340: URL: https://github.com/apache/iceberg/pull/13340#discussion_r2160852107
########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java: ########## @@ -220,37 +238,59 @@ SchemaInfo getSchemaInfo() { */ static class SchemaInfo { private final Map<Integer, Schema> schemas; - private final Map<Schema, Tuple2<Schema, CompareSchemasVisitor.Result>> lastResults; + private final Cache<Schema, SchemaCompareInfo> lastResults; - private SchemaInfo(Map<Integer, Schema> schemas) { + private SchemaInfo(Map<Integer, Schema> schemas, int inputSchemaCacheMaximumSize) { this.schemas = schemas; - this.lastResults = new LimitedLinkedHashMap<>(); + this.lastResults = + Caffeine.newBuilder() + .maximumSize(inputSchemaCacheMaximumSize) + .evictionListener(SchemaInfo::cacheEvictionListener) + .build(); + } + + private static void cacheEvictionListener( + Schema inputSchema, SchemaCompareInfo schemaCompareInfo, RemovalCause cause) { + if (cause == RemovalCause.SIZE) { + LOG.warn( + "Performance degraded as records with different schema is generated for the same table. " + + "Likely the DynamicRecord.schema is not reused. " + + "Reuse the same instance if the record schema is the same to improve performance"); + } } - private void update( - Schema newLastSchema, Tuple2<Schema, CompareSchemasVisitor.Result> newLastResult) { - lastResults.put(newLastSchema, newLastResult); + private void update(Schema newLastSchema, SchemaCompareInfo compareInfo) { + lastResults.put(newLastSchema, compareInfo); } @VisibleForTesting - Tuple2<Schema, CompareSchemasVisitor.Result> getLastResult(Schema schema) { - return lastResults.get(schema); + SchemaCompareInfo getLastResult(Schema schema) { + return lastResults.getIfPresent(schema); } } - @SuppressWarnings("checkstyle:IllegalType") - private static class LimitedLinkedHashMap<K, V> extends LinkedHashMap<K, V> { - @Override - protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { - boolean remove = size() > MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE; - if (remove) { - LOG.warn( - "Performance degraded as records with different schema is generated for the same table. " - + "Likely the DynamicRecord.schema is not reused. " - + "Reuse the same instance if the record schema is the same to improve performance"); - } + static class SchemaCompareInfo { + private final Schema tableSchema; + private final CompareSchemasVisitor.Result compareResult; + private final DataConverter converter; + + SchemaCompareInfo( + Schema tableSchema, CompareSchemasVisitor.Result compareResult, DataConverter converter) { + this.tableSchema = tableSchema; + this.compareResult = compareResult; + this.converter = converter; + } + + Schema tableSchema() { Review Comment: Acknowledged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org