kbendick commented on a change in pull request #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r761492796
##########
File path: core/src/test/java/org/apache/iceberg/hadoop/TestCachingCatalog.java
##########
@@ -123,4 +150,169 @@ public void testTableName() throws Exception {
Table snapshotsTable = catalog.loadTable(snapshotsTableIdent);
Assert.assertEquals("Name must match", "hadoop.db.ns1.ns2.tbl.snapshots",
snapshotsTable.name());
}
+
+ @Test
+ public void testTableExpiresAfterInterval() throws IOException {
+ TestableCachingCatalog catalog =
TestableCachingCatalog.wrap(hadoopCatalog(), EXPIRATION_TTL, ticker);
+
+ // Create the table and populate the catalog.
+ Namespace namespace = Namespace.of("db", "ns1", "ns2");
+ TableIdentifier tableIdent = TableIdentifier.of(namespace, "tbl");
+ catalog.createTable(tableIdent, SCHEMA, SPEC, ImmutableMap.of("key",
"value"));
+
+ // Ensure table is cached with full ttl remaining upon creation.
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
Assertions.assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
+
+ // Move time forward by half of the duration interval
+ ticker.advance(HALF_OF_EXPIRATION);
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
Assertions.assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+
+ // Move the ticker past the duration interval and then ensure that the
catalog is serving a new object for that
+ // identifier (even if the table that is loaded is functionally
equivalent).
+ ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
+
Assertions.assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ Assert.assertNotSame("CachingCatalog should return a new instance after
expiration",
+ table, catalog.loadTable(tableIdent));
+ }
+
+ @Test
+ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws
IOException {
+ TestableCachingCatalog catalog =
TestableCachingCatalog.wrap(hadoopCatalog(), EXPIRATION_TTL, ticker);
+
+ // Create the table and populate the catalog.
+ Namespace namespace = Namespace.of("db", "ns1", "ns2");
+ TableIdentifier tableIdent = TableIdentifier.of(namespace, "tbl");
+ catalog.createTable(tableIdent, SCHEMA, SPEC, ImmutableMap.of("key",
"value"));
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
Assertions.assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
+
+ // Move time forward without accessing the table from the cache and check
age of cached entry and remaining TTL
+ ticker.advance(HALF_OF_EXPIRATION);
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
Assertions.assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+
Assertions.assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+
+ // Move time forward a bit more
+ Duration oneMinute = Duration.ofMinutes(1L);
+ ticker.advance(oneMinute);
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
Assertions.assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION.plus(oneMinute));
+
Assertions.assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION.minus(oneMinute));
+
+ // Access the table via the catalog, which should refresh the TTL
+ Table table = catalog.loadTable(tableIdent);
+ assertCatalogEntryHasAge("After accessing the table via the catalog, its
entry in the cache should refresh its age",
+ catalog, tableIdent, Duration.ZERO);
+ assertCatalogEntryHasTtl("After accessing a table via the catalog, its
remaining ttl should reset",
+ catalog, tableIdent, EXPIRATION_TTL);
+
+ // Move forward, then access the table via the table object to ensure that
does not advance TTL
+ ticker.advance(HALF_OF_EXPIRATION);
+
+ // Access (but not mutate) via table object, not via the catalog itself.
+ table.refresh();
+ assertCatalogEntryHasAge("Accessing the table object directly should not
affect its cache age",
+ catalog, tableIdent, HALF_OF_EXPIRATION);
+ assertCatalogEntryHasTtl("After accessing the table object directly, its
cache ttl should not be affected",
+ catalog, tableIdent, HALF_OF_EXPIRATION);
+
+ // Table (and not catalog) access - but still not through the catalog
+ table.newAppend().appendFile(FILE_A).commit();
+ assertCatalogEntryHasAge("Mutating table operations shouldn't affect the
cached entry's age",
+ catalog, tableIdent, HALF_OF_EXPIRATION);
+ assertCatalogEntryHasTtl("Mutating table operations shouldn't affect the
cached entry's ttl",
+ catalog, tableIdent, HALF_OF_EXPIRATION);
+ }
+
+ @Test
+ public void testCacheExpirationEagerlyRemovesMetadataTables() throws
IOException {
+ TestableCachingCatalog catalog =
TestableCachingCatalog.wrap(hadoopCatalog(), EXPIRATION_TTL, ticker);
+
+ // Create the table and populate the catalog.
+ Namespace namespace = Namespace.of("db", "ns1", "ns2");
+ TableIdentifier tableIdent = TableIdentifier.of(namespace, "tbl");
+ Table table = catalog.createTable(tableIdent, SCHEMA, SPEC,
ImmutableMap.of("key2", "value2"));
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
+ // Write to the table. CachingCatalog should not expire its entry after
update.
+ table.newAppend().appendFile(FILE_A).commit();
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
Assertions.assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+
+ // Advance time by half of the expiration interval.
+ // We'll then load the metadata tables and ensure that they are expired
when the main table is expired,
+ // even if their own age is not up to the expiration interval.
+ //
+ // We want to ensure that all cached metadata tables are expired together.
+ ticker.advance(HALF_OF_EXPIRATION);
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertCatalogEntryHasAge("The caching catalog should record the correct
age as time advances",
+ catalog, tableIdent, HALF_OF_EXPIRATION);
+
+ // Load the metadata tables at time EXPIRATION_TTL / 2.
+ metadataTables(tableIdent).forEach(catalog::loadTable);
+ assertMetadataTablesAreCached(catalog, tableIdent);
+
+ // Accessing the metadata tables will also refresh the main table.
+ assertCatalogEntryHasTtl("Accessing the metadata tables should refresh
their origin table's time to live",
+ catalog, tableIdent, EXPIRATION_TTL);
+
+ // Sanity check - the cached metadata tables should have an age of ZERO.
+ metadataTables(tableIdent).forEach(metadataTbl ->
assertCatalogEntryHasAge("The metadata tables should have an age of zero on
load",
+ catalog, metadataTbl, Duration.ZERO));
+
+ // Move time forward now that metadata tables are loaded and then access
the data table again, to reset its value.
+ ticker.advance(HALF_OF_EXPIRATION);
+ metadataTables(tableIdent).forEach(catalog::loadTable);
+ metadataTables(tableIdent).forEach(metadataTbl ->
+ assertCatalogEntryHasAge("Reloading the metadata tables should reset
their age",
+ catalog, metadataTbl, Duration.ZERO));
+
+ // Sanity check. Because the table is still in the cache, its ttl isn't
updated on metadata table load
+ assertCatalogEntryHasAge("Loading metadata tables should not reload origin
table if its cached already",
+ catalog, tableIdent, HALF_OF_EXPIRATION);
+
+ // Move time forward the other half of the expiration interval
+ ticker.advance(HALF_OF_EXPIRATION);
+
Assertions.assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertCatalogHasExpiredMetadataTables(catalog, tableIdent);
+ }
+
+ // Move this method entirely in place.
+ public static void assertCatalogEntryHasAge(String assertionMessage,
TestableCachingCatalog catalog,
+ TableIdentifier identifier, Duration expectedAge) {
+ Assertions.assertThat(catalog.cache().asMap()).containsKey(identifier);
+ Assertions.assertThat(catalog.ageOf(identifier))
+ .isPresent().get().isEqualTo(expectedAge);
+ }
+
+ public static void assertCatalogEntryHasTtl(
+ String assertionMessage, TestableCachingCatalog catalog, TableIdentifier
ident, Duration expectedTimeLeft) {
+ Assert.assertEquals(assertionMessage,
+ Optional.ofNullable(expectedTimeLeft), catalog.remainingAgeFor(ident));
+ }
+
+ public static void assertMetadataTablesAreCached(TestableCachingCatalog
catalog, TableIdentifier tableIdentifier) {
+ metadataTables(tableIdentifier).forEach(
+ metadataTable ->
Assertions.assertThat(catalog.cache().asMap()).containsKey(metadataTable));
+ }
+
+ public static void assertCatalogHasExpiredMetadataTables(
+ TestableCachingCatalog catalog, TableIdentifier tableIdent) {
+ // Sanity check tha the table itself is expired
+ Assert.assertNull("The table should not be served by the CachingCatalog's
cache",
+ catalog.tableFromCacheQuietly(tableIdent));
+ metadataTables(tableIdent)
+ .forEach(metadataTable ->
+ Assert.assertNull("The CachingCatalog should not return metadata
tables for a TTL'd table",
+ catalog.tableFromCacheQuietly(metadataTable)));
+ }
+
+ public static List<TableIdentifier> metadataTables(TableIdentifier
tableIdent) {
+ return Arrays.stream(MetadataTableType.values())
+ .map(type -> TableIdentifier.parse(tableIdent + "." +
type.name().toLowerCase(Locale.ROOT)))
+ .collect(Collectors.toList());
+ }
Review comment:
I'm going to continue to in-line most of these. I just wanted to get
them out of the old class with the assertion helpers first and foremost.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]