aiborodin commented on code in PR #13340:
URL: https://github.com/apache/iceberg/pull/13340#discussion_r2160852107


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java:
##########
@@ -220,37 +238,59 @@ SchemaInfo getSchemaInfo() {
    */
   static class SchemaInfo {
     private final Map<Integer, Schema> schemas;
-    private final Map<Schema, Tuple2<Schema, CompareSchemasVisitor.Result>> 
lastResults;
+    private final Cache<Schema, SchemaCompareInfo> lastResults;
 
-    private SchemaInfo(Map<Integer, Schema> schemas) {
+    private SchemaInfo(Map<Integer, Schema> schemas, int 
inputSchemaCacheMaximumSize) {
       this.schemas = schemas;
-      this.lastResults = new LimitedLinkedHashMap<>();
+      this.lastResults =
+          Caffeine.newBuilder()
+              .maximumSize(inputSchemaCacheMaximumSize)
+              .evictionListener(SchemaInfo::cacheEvictionListener)
+              .build();
+    }
+
+    private static void cacheEvictionListener(
+        Schema inputSchema, SchemaCompareInfo schemaCompareInfo, RemovalCause 
cause) {
+      if (cause == RemovalCause.SIZE) {
+        LOG.warn(
+            "Performance degraded as records with different schema is 
generated for the same table. "
+                + "Likely the DynamicRecord.schema is not reused. "
+                + "Reuse the same instance if the record schema is the same to 
improve performance");
+      }
     }
 
-    private void update(
-        Schema newLastSchema, Tuple2<Schema, CompareSchemasVisitor.Result> 
newLastResult) {
-      lastResults.put(newLastSchema, newLastResult);
+    private void update(Schema newLastSchema, SchemaCompareInfo compareInfo) {
+      lastResults.put(newLastSchema, compareInfo);
     }
 
     @VisibleForTesting
-    Tuple2<Schema, CompareSchemasVisitor.Result> getLastResult(Schema schema) {
-      return lastResults.get(schema);
+    SchemaCompareInfo getLastResult(Schema schema) {
+      return lastResults.getIfPresent(schema);
     }
   }
 
-  @SuppressWarnings("checkstyle:IllegalType")
-  private static class LimitedLinkedHashMap<K, V> extends LinkedHashMap<K, V> {
-    @Override
-    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-      boolean remove = size() > MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE;
-      if (remove) {
-        LOG.warn(
-            "Performance degraded as records with different schema is 
generated for the same table. "
-                + "Likely the DynamicRecord.schema is not reused. "
-                + "Reuse the same instance if the record schema is the same to 
improve performance");
-      }
+  static class SchemaCompareInfo {
+    private final Schema tableSchema;
+    private final CompareSchemasVisitor.Result compareResult;
+    private final DataConverter converter;
+
+    SchemaCompareInfo(
+        Schema tableSchema, CompareSchemasVisitor.Result compareResult, 
DataConverter converter) {
+      this.tableSchema = tableSchema;
+      this.compareResult = compareResult;
+      this.converter = converter;
+    }
+
+    Schema tableSchema() {

Review Comment:
   Acknowledged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to