gaborkaszab commented on code in PR #14398:
URL: https://github.com/apache/iceberg/pull/14398#discussion_r2523167058


##########
core/src/main/java/org/apache/iceberg/rest/HTTPClient.java:
##########
@@ -326,6 +326,13 @@ protected <T extends RESTResponse> T execute(
 
       // Skip parsing the response stream for any successful request not 
expecting a response body
       if (emptyBody(response, responseType)) {
+        if (response.getCode() == HttpStatus.SC_NOT_MODIFIED

Review Comment:
   Added a comment. Let me know what you think.



##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -402,8 +492,26 @@ public Table loadTable(SessionContext context, 
TableIdentifier identifier) {
     MetadataTableType metadataType;
     LoadTableResponse response;
     TableIdentifier loadedIdent;
+
+    Map<String, String> responseHeaders = Maps.newHashMap();
+    TableWithETag cachedTable =
+        tableCache.getIfPresent(SessionIDTableID.of(context.sessionId(), 
identifier));
+
     try {
-      response = loadInternal(context, identifier, snapshotMode);
+      response =
+          loadInternal(
+              context,
+              identifier,
+              snapshotMode,
+              headersForLoadTable(cachedTable),
+              responseHeaders::putAll);
+
+      if (response == null) {
+        Preconditions.checkNotNull(cachedTable, "Invalid response: null");

Review Comment:
   I went for `Invalid load table response: null`. I found it unnecessary to 
mention that it's from the rest server.



##########
core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.iceberg.io.FileIO;
+
+class TestableRESTSessionCatalog extends RESTSessionCatalog {

Review Comment:
   See above comment



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3066,6 +3238,528 @@ public void testCommitStateUnknownNotReconciled() {
         .satisfies(ex -> assertThat(((CommitStateUnknownException) 
ex).getSuppressed()).isEmpty());
   }
 
+  @Test
+  public void testInvalidTableCacheParameters() {
+    RESTCatalog catalog = new RESTCatalog(config -> new 
RESTCatalogAdapter(backendCatalog));
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid max entries: negative");
+  }
+
+  @Test
+  public void testFreshnessAwareLoading() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+    assertThat(tableAfterFirstLoad.operations().current().location())
+        .isEqualTo(tableAfterSecondLoad.operations().current().location());
+    assertThat(
+            tableCache
+                .asMap()
+                .get(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE))
+                .table()
+                .operations()
+                .current()
+                .metadataFileLocation())
+        
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingMetadataTables() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    BaseMetadataTable metadataTable =
+        (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(table).isNotEqualTo(metadataTable.table());
+    assertThat(table.operations().current().metadataFileLocation())
+        
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  @Test
+  public void testRenameTableInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) ->
+            catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), 
"other_table")),
+        0);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testDropTableInvalidatesTable(boolean purge) {
+    runTableInvalidationTest(
+        restCatalog, adapterForRESTServer, (catalog) -> 
catalog.dropTable(TABLE, purge), 0);
+  }
+
+  @Test
+  public void testTableExistViaHeadRequestInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        ((catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          catalog.tableExists(TABLE);

Review Comment:
   Sure. Added the check to all the other similar tests too



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3066,6 +3238,528 @@ public void testCommitStateUnknownNotReconciled() {
         .satisfies(ex -> assertThat(((CommitStateUnknownException) 
ex).getSuppressed()).isEmpty());
   }
 
+  @Test
+  public void testInvalidTableCacheParameters() {
+    RESTCatalog catalog = new RESTCatalog(config -> new 
RESTCatalogAdapter(backendCatalog));
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid max entries: negative");
+  }
+
+  @Test
+  public void testFreshnessAwareLoading() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+    assertThat(tableAfterFirstLoad.operations().current().location())
+        .isEqualTo(tableAfterSecondLoad.operations().current().location());
+    assertThat(
+            tableCache
+                .asMap()
+                .get(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE))
+                .table()
+                .operations()
+                .current()
+                .metadataFileLocation())
+        
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingMetadataTables() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    BaseMetadataTable metadataTable =
+        (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(table).isNotEqualTo(metadataTable.table());
+    assertThat(table.operations().current().metadataFileLocation())
+        
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  @Test
+  public void testRenameTableInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) ->
+            catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), 
"other_table")),
+        0);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testDropTableInvalidatesTable(boolean purge) {
+    runTableInvalidationTest(
+        restCatalog, adapterForRESTServer, (catalog) -> 
catalog.dropTable(TABLE, purge), 0);
+  }
+
+  @Test
+  public void testTableExistViaHeadRequestInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        ((catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          catalog.tableExists(TABLE);
+        }),
+        0);
+  }
+
+  @Test
+  public void testTableExistViaGetRequestInvalidatesTable() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    // Configure REST server to answer tableExists query via GET
+    Mockito.doAnswer(
+            invocation ->
+                ConfigResponse.builder()
+                    .withEndpoints(
+                        ImmutableList.of(
+                            Endpoint.V1_LOAD_TABLE,
+                            Endpoint.V1_CREATE_NAMESPACE,
+                            Endpoint.V1_CREATE_TABLE))
+                    .build())
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, ResourcePaths.config()),
+            eq(ConfigResponse.class),
+            any(),
+            any());
+
+    RESTCatalog catalog = new RESTCatalog(defaultSessionContext, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    runTableInvalidationTest(
+        catalog,
+        adapter,
+        (cat) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          cat.tableExists(TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableInvalidatesCache() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          assertThatThrownBy(() -> catalog.loadTable(TABLE))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableWithMetadataTableNameInvalidatesCache() {
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  private void runTableInvalidationTest(
+      RESTCatalog catalog,
+      RESTCatalogAdapter adapterToVerify,
+      Consumer<RESTCatalog> action,
+      int loadTableCountFromAction) {
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
catalog.sessionCatalog().tableCache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    action.accept(catalog);
+
+    // Check that 'action' invalidates cache
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    assertThatThrownBy(() -> catalog.loadTable(TABLE))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageContaining("Table does not exist: %s", TABLE);
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    expectFullTableLoadForLoadTable(TABLE, adapterToVerify);
+
+    BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE);
+
+    
assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(newTableWithSameName).isNotEqualTo(originalTable);
+    
assertThat(newTableWithSameName.operations().current().metadataFileLocation())
+        
.isNotEqualTo(originalTable.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testTableCacheWithMultiSessions() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> 
adapter, null);
+    sessionCatalog.initialize("test_session_catalog", Map.of());
+
+    SessionCatalog.SessionContext otherSessionContext =
+        new SessionCatalog.SessionContext(
+            "session_id_2", "user", ImmutableMap.of("credential", 
"user:12345"), ImmutableMap.of());
+
+    sessionCatalog.createNamespace(defaultSessionContext, TABLE.namespace());
+
+    sessionCatalog.buildTable(defaultSessionContext, TABLE, SCHEMA).create();
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(defaultSessionContext, TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
sessionCatalog.tableCache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(otherSessionContext, TABLE);
+
+    assertThat(tableCache.asMap())
+        .containsOnlyKeys(
+            SessionIDTableID.of(defaultSessionContext.sessionId(), TABLE),
+            SessionIDTableID.of(otherSessionContext.sessionId(), TABLE));
+  }
+
+  @Test
+  public void testTableCacheWithTableCommit() {
+    runTestWhereTableOpsDoesNotChangeTableCache(
+        table -> table.newAppend().appendFile(FILE_A).commit(), 2);
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingWithTableRefresh() {
+    runTestWhereTableOpsDoesNotChangeTableCache(
+        table -> {
+          RESTCatalog catalogToChangeTable = catalog(new 
RESTCatalogAdapter(backendCatalog));
+          
catalogToChangeTable.loadTable(TABLE).newAppend().appendFile(FILE_A).commit();
+
+          table.refresh();
+        },
+        1);
+  }
+
+  private void runTestWhereTableOpsDoesNotChangeTableCache(
+      Consumer<BaseTable> action, int loadTableCountFromAction) {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+
+    BaseTable tableInCacheBeforeCommit =
+        tableCache
+            .asMap()
+            .get(SessionIDTableID.of(defaultSessionContext.sessionId(), TABLE))
+            .table();
+
+    action.accept(table);
+
+    BaseTable tableInCacheAfterCommit =
+        tableCache
+            .asMap()
+            .get(SessionIDTableID.of(defaultSessionContext.sessionId(), TABLE))
+            .table();
+
+    assertThat(tableInCacheAfterCommit).isEqualTo(tableInCacheBeforeCommit);
+    
assertThat(tableInCacheAfterCommit.operations().current().metadataFileLocation())
+        .isNotEqualTo(table.operations().current().metadataFileLocation());
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    catalog().loadTable(TABLE);
+
+    Mockito.verify(adapterForRESTServer, times(2 + loadTableCountFromAction))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void test304NotModifiedResponseWithEmptyTableCache() {
+    Mockito.doAnswer(invocation -> null)
+        .when(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    catalog().invalidateTable(TABLE);
+
+    // Table is not in the cache and null LoadTableResponse is received
+    assertThatThrownBy(() -> catalog().loadTable(TABLE))
+        .isInstanceOf(RESTException.class)
+        .hasMessage(
+            "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s",
+            HTTPMethod.GET, RESOURCE_PATHS.table(TABLE));
+  }
+
+  @Test
+  public void testTableCacheNotUpdatedWithoutETag() {
+    RESTCatalogAdapter adapter =
+        Mockito.spy(
+            new RESTCatalogAdapter(backendCatalog) {
+              @Override
+              public <T extends RESTResponse> T execute(
+                  HTTPRequest request,
+                  Class<T> responseType,
+                  Consumer<ErrorResponse> errorHandler,
+                  Consumer<Map<String, String>> responseHeaders) {
+                // Wrap the original responseHeaders to not accept ETag.
+                Consumer<Map<String, String>> noETagConsumer =
+                    headers -> {
+                      if (!headers.containsKey(HttpHeaders.ETAG)) {
+                        responseHeaders.accept(headers);
+                      }
+                    };
+                return super.execute(request, responseType, errorHandler, 
noETagConsumer);
+              }
+            });
+
+    RESTCatalog catalog = new RESTCatalog(defaultSessionContext, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    catalog.loadTable(TABLE);

Review Comment:
   In previous PRs I got comments to arrange tests like this. This might also 
follow the pattern used in this file too. No strong preference, though.



##########
core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java:
##########
@@ -69,19 +72,39 @@ public RESTCatalog(Function<Map<String, String>, 
RESTClient> clientBuilder) {
   public RESTCatalog(
       SessionCatalog.SessionContext context,
       Function<Map<String, String>, RESTClient> clientBuilder) {
-    this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
+    this(context, clientBuilder, null);
+  }
+
+  @VisibleForTesting
+  RESTCatalog(
+      SessionCatalog.SessionContext context,
+      Function<Map<String, String>, RESTClient> clientBuilder,
+      BiFunction<SessionCatalog.SessionContext, Map<String, String>, FileIO> 
ioBuilder) {
+    this.sessionCatalog = createSessionCatalog(clientBuilder, ioBuilder);
     this.delegate = sessionCatalog.asCatalog(context);
     this.nsDelegate = (SupportsNamespaces) delegate;
     this.context = context;
     this.viewSessionCatalog = sessionCatalog.asViewCatalog(context);
   }
 
+  @VisibleForTesting
+  RESTSessionCatalog createSessionCatalog(

Review Comment:
   This isn't a constructor, but the function introduced to create the session 
catalog. This is overridden to be able to create 'Testable' version of the 
session catalog.



##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -495,7 +642,9 @@ public Catalog.TableBuilder buildTable(
   }
 
   @Override
-  public void invalidateTable(SessionContext context, TableIdentifier ident) {}
+  public void invalidateTable(SessionContext context, TableIdentifier ident) {
+    tableCache.invalidate(SessionIDTableID.of(context.sessionId(), ident));

Review Comment:
   you're right. I think it's `Id`. Changed these to reflect that



##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -413,14 +521,34 @@ public Table loadTable(SessionContext context, 
TableIdentifier identifier) {
         // attempt to load a metadata table using the identifier's namespace 
as the base table
         TableIdentifier baseIdent = 
TableIdentifier.of(identifier.namespace().levels());
         try {
-          response = loadInternal(context, baseIdent, snapshotMode);
+          responseHeaders.clear();
+          cachedTable =
+              tableCache.getIfPresent(SessionIDTableID.of(context.sessionId(), 
baseIdent));
+
+          response =
+              loadInternal(
+                  context,
+                  baseIdent,
+                  snapshotMode,
+                  headersForLoadTable(cachedTable),
+                  responseHeaders::putAll);
+
+          if (response == null) {
+            Preconditions.checkNotNull(cachedTable, "Invalid response: null");

Review Comment:
   done



##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -256,9 +292,46 @@ public void initialize(String name, Map<String, String> 
unresolved) {
             mergedProps,
             RESTCatalogProperties.METRICS_REPORTING_ENABLED,
             RESTCatalogProperties.METRICS_REPORTING_ENABLED_DEFAULT);
+
+    this.tableCache = tableCacheBuilder(mergedProps).build();
+
     super.initialize(name, mergedProps);
   }
 
+  @VisibleForTesting
+  Caffeine<Object, Object> tableCacheBuilder(Map<String, String> props) {

Review Comment:
   I just grepped for `Caffeine<` and apparently <Object, Object> is how we 
create these builders. I can change this, but I'd rather follow the pattern.



##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -458,20 +586,39 @@ public Table loadTable(SessionContext context, 
TableIdentifier identifier) {
             tableMetadata,
             endpoints);
 
-    trackFileIO(ops);
-
     BaseTable table =
         new BaseTable(
             ops,
             fullTableName(finalIdentifier),
             metricsReporter(paths.metrics(finalIdentifier), tableClient));
+
+    String eTag = responseHeaders.getOrDefault(HttpHeaders.ETAG, null);
+    if (eTag != null) {
+      BaseTable clonedTable = cloneTable(table);
+
+      trackFileIO((RESTTableOperations) clonedTable.operations());
+
+      tableCache.put(
+          SessionIDTableID.of(context.sessionId(), finalIdentifier),
+          TableWithETag.of(clonedTable, eTag));
+    } else {
+      trackFileIO(ops);
+    }
+
     if (metadataType != null) {
       return MetadataTableUtils.createMetadataTableInstance(table, 
metadataType);
     }
 
     return table;
   }
 
+  private BaseTable cloneTable(BaseTable originalTable) {

Review Comment:
   Just made some measurement on this and apparently, this has no impact on 
runtime (loaded a table of 1000 cols, 1000 snapshots and called loadTable() in 
a loop). Note, not the whole object tree is cloned, simple the BaseTable and 
TableOperations objects, but the underlying TableMetadata, etc are the same 
objects. Added a comment about this for more clarity.



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -2930,17 +2945,14 @@ public void testNotModified() {
             any(),
             any());
 
-    // TODO: This won't throw when client side of freshness-aware loading is 
implemented
-    assertThatThrownBy(() -> 
catalog().loadTable(TABLE)).isInstanceOf(NullPointerException.class);
+    catalog().loadTable(TABLE);
 
     TableIdentifier metadataTableIdentifier =
         TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
 
-    // TODO: This won't throw when client side of freshness-aware loading is 
implemented
-    assertThatThrownBy(() -> catalog().loadTable(metadataTableIdentifier))
-        .isInstanceOf(NullPointerException.class);
+    catalog().loadTable(metadataTableIdentifier);
 
-    Mockito.verify(adapterForRESTServer, times(2))
+    Mockito.verify(adapterForRESTServer, times(3))

Review Comment:
   Previously this test used mocking to fill in the IF_NONE_MATCH header. Now, 
the mocking is gone but we need the table to be in the cache so that we can 
fill that header, so I added an extra loadTable() call to the beginning of the 
test to do that. This check is for counting the loadTable() calls, so there is 
one more now.



##########
core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.io.FileIO;
+
+class TestableRESTCatalog extends RESTCatalog {

Review Comment:
   No plan to use this for other purposes for now. I found TestRESTCatalog.java 
still way too long, hence I decided to put this into a separate file. Let me 
know if you still think I should move it there.



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -117,6 +135,13 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
   private Server httpServer;
   private RESTCatalogAdapter adapterForRESTServer;
 
+  private final SessionCatalog.SessionContext defaultSessionContext =

Review Comment:
   yes, done



##########
core/src/test/java/org/apache/iceberg/io/FileIOTrackerTestUtil.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import org.apache.iceberg.TableOperations;
+
+public abstract class FileIOTrackerTestUtil {

Review Comment:
   This new class is part of the dependency PR. Should we discuss it 
[there](https://github.com/apache/iceberg/pull/14378)?



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3066,6 +3238,528 @@ public void testCommitStateUnknownNotReconciled() {
         .satisfies(ex -> assertThat(((CommitStateUnknownException) 
ex).getSuppressed()).isEmpty());
   }
 
+  @Test
+  public void testInvalidTableCacheParameters() {
+    RESTCatalog catalog = new RESTCatalog(config -> new 
RESTCatalogAdapter(backendCatalog));
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid max entries: negative");
+  }
+
+  @Test
+  public void testFreshnessAwareLoading() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+    assertThat(tableAfterFirstLoad.operations().current().location())
+        .isEqualTo(tableAfterSecondLoad.operations().current().location());
+    assertThat(
+            tableCache
+                .asMap()
+                .get(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE))
+                .table()
+                .operations()
+                .current()
+                .metadataFileLocation())
+        
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingMetadataTables() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    BaseMetadataTable metadataTable =
+        (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(table).isNotEqualTo(metadataTable.table());
+    assertThat(table.operations().current().metadataFileLocation())
+        
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  @Test
+  public void testRenameTableInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) ->
+            catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), 
"other_table")),
+        0);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testDropTableInvalidatesTable(boolean purge) {
+    runTableInvalidationTest(
+        restCatalog, adapterForRESTServer, (catalog) -> 
catalog.dropTable(TABLE, purge), 0);
+  }
+
+  @Test
+  public void testTableExistViaHeadRequestInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        ((catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          catalog.tableExists(TABLE);
+        }),
+        0);
+  }
+
+  @Test
+  public void testTableExistViaGetRequestInvalidatesTable() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    // Configure REST server to answer tableExists query via GET
+    Mockito.doAnswer(
+            invocation ->
+                ConfigResponse.builder()
+                    .withEndpoints(
+                        ImmutableList.of(
+                            Endpoint.V1_LOAD_TABLE,
+                            Endpoint.V1_CREATE_NAMESPACE,
+                            Endpoint.V1_CREATE_TABLE))
+                    .build())
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, ResourcePaths.config()),
+            eq(ConfigResponse.class),
+            any(),
+            any());
+
+    RESTCatalog catalog = new RESTCatalog(defaultSessionContext, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    runTableInvalidationTest(
+        catalog,
+        adapter,
+        (cat) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          cat.tableExists(TABLE);

Review Comment:
   done



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3066,6 +3238,528 @@ public void testCommitStateUnknownNotReconciled() {
         .satisfies(ex -> assertThat(((CommitStateUnknownException) 
ex).getSuppressed()).isEmpty());
   }
 
+  @Test
+  public void testInvalidTableCacheParameters() {
+    RESTCatalog catalog = new RESTCatalog(config -> new 
RESTCatalogAdapter(backendCatalog));
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid max entries: negative");
+  }
+
+  @Test
+  public void testFreshnessAwareLoading() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+    assertThat(tableAfterFirstLoad.operations().current().location())
+        .isEqualTo(tableAfterSecondLoad.operations().current().location());
+    assertThat(
+            tableCache
+                .asMap()
+                .get(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE))
+                .table()
+                .operations()
+                .current()
+                .metadataFileLocation())
+        
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingMetadataTables() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    BaseMetadataTable metadataTable =
+        (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(table).isNotEqualTo(metadataTable.table());
+    assertThat(table.operations().current().metadataFileLocation())
+        
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  @Test
+  public void testRenameTableInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) ->
+            catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), 
"other_table")),
+        0);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testDropTableInvalidatesTable(boolean purge) {
+    runTableInvalidationTest(
+        restCatalog, adapterForRESTServer, (catalog) -> 
catalog.dropTable(TABLE, purge), 0);
+  }
+
+  @Test
+  public void testTableExistViaHeadRequestInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        ((catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          catalog.tableExists(TABLE);
+        }),
+        0);
+  }
+
+  @Test
+  public void testTableExistViaGetRequestInvalidatesTable() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    // Configure REST server to answer tableExists query via GET
+    Mockito.doAnswer(
+            invocation ->
+                ConfigResponse.builder()
+                    .withEndpoints(
+                        ImmutableList.of(
+                            Endpoint.V1_LOAD_TABLE,
+                            Endpoint.V1_CREATE_NAMESPACE,
+                            Endpoint.V1_CREATE_TABLE))
+                    .build())
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, ResourcePaths.config()),
+            eq(ConfigResponse.class),
+            any(),
+            any());
+
+    RESTCatalog catalog = new RESTCatalog(defaultSessionContext, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    runTableInvalidationTest(
+        catalog,
+        adapter,
+        (cat) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          cat.tableExists(TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableInvalidatesCache() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          assertThatThrownBy(() -> catalog.loadTable(TABLE))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableWithMetadataTableNameInvalidatesCache() {
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  private void runTableInvalidationTest(
+      RESTCatalog catalog,
+      RESTCatalogAdapter adapterToVerify,
+      Consumer<RESTCatalog> action,
+      int loadTableCountFromAction) {
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
catalog.sessionCatalog().tableCache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    action.accept(catalog);
+
+    // Check that 'action' invalidates cache
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    assertThatThrownBy(() -> catalog.loadTable(TABLE))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageContaining("Table does not exist: %s", TABLE);
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    expectFullTableLoadForLoadTable(TABLE, adapterToVerify);
+
+    BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE);
+
+    
assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(newTableWithSameName).isNotEqualTo(originalTable);
+    
assertThat(newTableWithSameName.operations().current().metadataFileLocation())
+        
.isNotEqualTo(originalTable.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testTableCacheWithMultiSessions() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> 
adapter, null);
+    sessionCatalog.initialize("test_session_catalog", Map.of());
+
+    SessionCatalog.SessionContext otherSessionContext =
+        new SessionCatalog.SessionContext(
+            "session_id_2", "user", ImmutableMap.of("credential", 
"user:12345"), ImmutableMap.of());
+
+    sessionCatalog.createNamespace(defaultSessionContext, TABLE.namespace());
+
+    sessionCatalog.buildTable(defaultSessionContext, TABLE, SCHEMA).create();
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(defaultSessionContext, TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
sessionCatalog.tableCache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(otherSessionContext, TABLE);
+
+    assertThat(tableCache.asMap())
+        .containsOnlyKeys(
+            SessionIDTableID.of(defaultSessionContext.sessionId(), TABLE),
+            SessionIDTableID.of(otherSessionContext.sessionId(), TABLE));
+  }
+
+  @Test
+  public void testTableCacheWithTableCommit() {
+    runTestWhereTableOpsDoesNotChangeTableCache(
+        table -> table.newAppend().appendFile(FILE_A).commit(), 2);
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingWithTableRefresh() {

Review Comment:
   Removed



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3066,6 +3238,528 @@ public void testCommitStateUnknownNotReconciled() {
         .satisfies(ex -> assertThat(((CommitStateUnknownException) 
ex).getSuppressed()).isEmpty());
   }
 
+  @Test
+  public void testInvalidTableCacheParameters() {
+    RESTCatalog catalog = new RESTCatalog(config -> new 
RESTCatalogAdapter(backendCatalog));
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid expire after write: zero or negative");
+
+    assertThatThrownBy(
+            () ->
+                catalog.initialize(
+                    "test", 
Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid max entries: negative");
+  }
+
+  @Test
+  public void testFreshnessAwareLoading() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer);
+
+    BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad);
+    assertThat(tableAfterFirstLoad.operations().current().location())
+        .isEqualTo(tableAfterSecondLoad.operations().current().location());
+    assertThat(
+            tableCache
+                .asMap()
+                .get(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE))
+                .table()
+                .operations()
+                .current()
+                .metadataFileLocation())
+        
.isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testFreshnessAwareLoadingMetadataTables() {
+    catalog().createNamespace(TABLE.namespace());
+
+    catalog().createTable(TABLE, SCHEMA);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
restCatalog.sessionCatalog().tableCache();
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    BaseTable table = (BaseTable) catalog().loadTable(TABLE);
+
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    BaseMetadataTable metadataTable =
+        (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier);
+
+    assertThat(tableCache.stats().hitCount()).isEqualTo(1);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(table).isNotEqualTo(metadataTable.table());
+    assertThat(table.operations().current().metadataFileLocation())
+        
.isEqualTo(metadataTable.table().operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterForRESTServer, times(2))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  @Test
+  public void testRenameTableInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) ->
+            catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), 
"other_table")),
+        0);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testDropTableInvalidatesTable(boolean purge) {
+    runTableInvalidationTest(
+        restCatalog, adapterForRESTServer, (catalog) -> 
catalog.dropTable(TABLE, purge), 0);
+  }
+
+  @Test
+  public void testTableExistViaHeadRequestInvalidatesTable() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        ((catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          catalog.tableExists(TABLE);
+        }),
+        0);
+  }
+
+  @Test
+  public void testTableExistViaGetRequestInvalidatesTable() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    // Configure REST server to answer tableExists query via GET
+    Mockito.doAnswer(
+            invocation ->
+                ConfigResponse.builder()
+                    .withEndpoints(
+                        ImmutableList.of(
+                            Endpoint.V1_LOAD_TABLE,
+                            Endpoint.V1_CREATE_NAMESPACE,
+                            Endpoint.V1_CREATE_TABLE))
+                    .build())
+        .when(adapter)
+        .execute(
+            reqMatcher(HTTPMethod.GET, ResourcePaths.config()),
+            eq(ConfigResponse.class),
+            any(),
+            any());
+
+    RESTCatalog catalog = new RESTCatalog(defaultSessionContext, config -> 
adapter);
+    catalog.initialize(
+        "catalog",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    runTableInvalidationTest(
+        catalog,
+        adapter,
+        (cat) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          cat.tableExists(TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableInvalidatesCache() {
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          assertThatThrownBy(() -> catalog.loadTable(TABLE))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+  }
+
+  @Test
+  public void testLoadTableWithMetadataTableNameInvalidatesCache() {
+    TableIdentifier metadataTableIdentifier =
+        TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), 
"partitions");
+
+    runTableInvalidationTest(
+        restCatalog,
+        adapterForRESTServer,
+        (catalog) -> {
+          // Use a different catalog to drop the table
+          catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, 
true);
+
+          // This catalog still has the table in cache
+          assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier))
+              .isInstanceOf(NoSuchTableException.class)
+              .hasMessage("Table does not exist: %s", TABLE);
+        },
+        1);
+
+    Mockito.verify(adapterForRESTServer)
+        .execute(
+            reqMatcher(HTTPMethod.GET, 
RESOURCE_PATHS.table(metadataTableIdentifier)),
+            any(),
+            any(),
+            any());
+  }
+
+  private void runTableInvalidationTest(
+      RESTCatalog catalog,
+      RESTCatalogAdapter adapterToVerify,
+      Consumer<RESTCatalog> action,
+      int loadTableCountFromAction) {
+    catalog.createNamespace(TABLE.namespace());
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
catalog.sessionCatalog().tableCache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    action.accept(catalog);
+
+    // Check that 'action' invalidates cache
+    assertThat(tableCache.estimatedSize()).isZero();
+
+    assertThatThrownBy(() -> catalog.loadTable(TABLE))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageContaining("Table does not exist: %s", TABLE);
+
+    catalog.createTable(TABLE, SCHEMA);
+
+    expectFullTableLoadForLoadTable(TABLE, adapterToVerify);
+
+    BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE);
+
+    
assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction);
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    assertThat(newTableWithSameName).isNotEqualTo(originalTable);
+    
assertThat(newTableWithSameName.operations().current().metadataFileLocation())
+        
.isNotEqualTo(originalTable.operations().current().metadataFileLocation());
+
+    Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction))
+        .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), 
any(), any(), any());
+  }
+
+  @Test
+  public void testTableCacheWithMultiSessions() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> 
adapter, null);
+    sessionCatalog.initialize("test_session_catalog", Map.of());
+
+    SessionCatalog.SessionContext otherSessionContext =
+        new SessionCatalog.SessionContext(
+            "session_id_2", "user", ImmutableMap.of("credential", 
"user:12345"), ImmutableMap.of());
+
+    sessionCatalog.createNamespace(defaultSessionContext, TABLE.namespace());
+
+    sessionCatalog.buildTable(defaultSessionContext, TABLE, SCHEMA).create();
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(defaultSessionContext, TABLE);
+
+    Cache<SessionIDTableID, TableWithETag> tableCache = 
sessionCatalog.tableCache();
+    assertThat(tableCache.stats().hitCount()).isZero();
+    assertThat(tableCache.asMap())
+        
.containsOnlyKeys(SessionIDTableID.of(defaultSessionContext.sessionId(), 
TABLE));
+
+    expectFullTableLoadForLoadTable(TABLE, adapter);
+
+    sessionCatalog.loadTable(otherSessionContext, TABLE);
+
+    assertThat(tableCache.asMap())
+        .containsOnlyKeys(
+            SessionIDTableID.of(defaultSessionContext.sessionId(), TABLE),
+            SessionIDTableID.of(otherSessionContext.sessionId(), TABLE));
+  }
+
+  @Test
+  public void testTableCacheWithTableCommit() {

Review Comment:
   Agree, this was meant to be a way of documenting that updating the cache is 
not yet supported through table ops. Removing these tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to