aiborodin commented on code in PR #14406:
URL: https://github.com/apache/iceberg/pull/14406#discussion_r2587903118


##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java:
##########
@@ -87,8 +94,71 @@ void testCachingDisabled() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
     catalog.createTable(tableIdentifier, SCHEMA);
-    TableMetadataCache cache = new TableMetadataCache(catalog, 0, 
Long.MAX_VALUE, 10);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 0, Long.MAX_VALUE, Clock.systemUTC(), 
10);
 
     assertThat(cache.getInternalCache()).isEmpty();
   }
+
+  @Test
+  void testNoCacheRefreshBeforeRefreshIntervalElapses() {
+    // Create table
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+    Table table = catalog.createTable(tableIdentifier, SCHEMA2);
+
+    // Create test clock
+    long firstEpochMillis = 1L;
+    MutableClock testClock = new MutableClock(ZoneOffset.UTC, 
firstEpochMillis);
+
+    // Init cache
+    long refreshIntervalMillis = 100L;
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, refreshIntervalMillis, testClock, 
10);
+    cache.update(tableIdentifier, table);
+
+    // Cache schema
+    Schema schema = cache.schema(tableIdentifier, 
SCHEMA2).resolvedTableSchema();
+    assertThat(schema.sameSchema(SCHEMA2)).isTrue();
+
+    // Progress clock to timestamp before refresh interval
+    long secondEpochMilli = 2L;
+    testClock.setEpochMilli(secondEpochMilli);
+
+    // Cache schema with fewer fields
+    TableMetadataCache.ResolvedSchemaInfo schemaInfo = 
cache.schema(tableIdentifier, SCHEMA);
+    assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
+    assertThat(schemaInfo.compareResult())
+        .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+
+    // Assert both schemas are in cache
+    assertThat(cache.getResolvedSchemaInfo(tableIdentifier, 
SCHEMA)).isNotNull();
+    assertThat(cache.getResolvedSchemaInfo(tableIdentifier, 
SCHEMA2)).isNotNull();
+  }
+
+  private static class MutableClock extends Clock {
+    private final ZoneId zoneId;
+    private long epochMilli;
+
+    MutableClock(ZoneId zoneId, long epochMilli) {
+      this.zoneId = zoneId;
+      this.epochMilli = epochMilli;
+    }
+
+    void setEpochMilli(long epochMilli) {
+      this.epochMilli = epochMilli;
+    }
+
+    public ZoneId getZone() {
+      return this.zoneId;
+    }
+
+    public Clock withZone(ZoneId zone) {
+      return zone.equals(this.zoneId) ? this : new MutableClock(zone, 
this.epochMilli);
+    }
+
+    public Instant instant() {

Review Comment:
   I replaced this class with `Clock.fixed()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to