moomindani commented on PR #14440:
URL: https://github.com/apache/iceberg/pull/14440#issuecomment-4621071011

   Thanks @gaborkaszab, that's a fair ask — I wanted to check it rather than 
hand-wave, so I tried reproducing it end-to-end through Spark SQL and wanted to 
share what I found.
   
   Setup: three Iceberg catalogs over one Hadoop warehouse / one table — 
`writer` (cache disabled, standing in for an ingestion job updating the table 
out-of-band), `acc` (`cache.expiration-interval-ms=3000`, today's default), and 
`wrt` (`expire-after-write-interval-ms=3000`, the proposed policy). A reader 
runs `SELECT count(*)` every 300 ms; after `writer` commits an update:
   
   ```
   acc (expireAfterAccess=3000ms): stale for all 841 reads over a 300s window — 
didn't see the update; count=1
   wrt (expireAfterWrite=3000ms): picked up the update after ~3.0s; count=2
   ```
   
   What stood out to me is that it isn't a transient lag — because 
`expireAfterAccess` resets on every read, a table read more often than the TTL 
seems to stay on the cached snapshot indefinitely (I tried 8 s / 60 s / 300 s 
and `acc` was stale the whole time in each). So the case I had in mind is a 
long-lived driver — a SQL endpoint, or a streaming job enriching against a 
dimension table that's continuously updated by an ingestion job — reading more 
often than the 30 s default while another job updates the table.
   
   On `refresh()` — it's not that it's unreachable: you can force a reload with 
`REFRESH TABLE` (I confirmed it does clear the cache — `acc` flipped to the new 
count right after), and an app holding a `Table` can call `refresh()` directly. 
The issue is that it's a manual, per-statement action, and there are common 
patterns where issuing it isn't practical — you can't inject it into a running 
stream, and prefixing every statement in an automated `CTAS` / `INSERT … 
SELECT` / `MERGE INTO` pipeline (where a stale source read silently produces 
wrong output) isn't realistic. For those, the fallbacks today are disabling the 
cache (the ~2,250x-slower path) or restarting — which is where an automatic 
`expireAfterWrite` bound helps.
   
   On the benchmark — agreed, the latency table is really about cost rather 
than staleness; the freshness section and this e2e are the staleness side. I 
included the latency mostly because turning the cache off is the workaround 
people reach for, and it's good to know what that costs.
   
   One thing I ran into: the write-TTL isn't reachable from Spark yet — the PR 
adds it to `CachingCatalog`/`CatalogProperties`, but `SparkCatalog` still 
passes only the access interval. I wired 
`cache.expiration.expire-after-write-interval-ms` through `SparkCatalog` to run 
this e2e (diff below). @blcksrx, might be worth folding into the PR (and 
probably the Flink catalog too).
   
   On dev@ — agreed that's the right venue for wider input. @blcksrx, since 
it's your PR, I think it's best coming from you; the e2e below is yours to use 
as supporting material, and please add or reframe anything you'd like. Thanks 
again @gaborkaszab for the push — it was a good prompt to actually verify.
   
   <details>
   <summary><code>SparkCatalog</code> wiring for 
<code>cache.expiration.expire-after-write-interval-ms</code></summary>
   
   ````diff
   @@ SparkCatalog.initialize @@
        long cacheExpirationIntervalMs =
            PropertyUtil.propertyAsLong(
                options,
                CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
                CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
   
   +    long cacheExpireAfterWriteIntervalMs =
   +        PropertyUtil.propertyAsLong(
   +            options,
   +            
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS,
   +            
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);
   +
        // An expiration interval of 0ms effectively disables caching.
        // Do not wrap with CachingCatalog.
   -    if (cacheExpirationIntervalMs == 0) {
   +    if (cacheExpirationIntervalMs == 0 && cacheExpireAfterWriteIntervalMs 
== 0) {
          cacheEnabled = false;
        }
   @@
        this.icebergCatalog =
            cacheEnabled
   -            ? CachingCatalog.wrap(catalog, cacheCaseSensitive, 
cacheExpirationIntervalMs)
   +            ? CachingCatalog.wrap(
   +                catalog,
   +                cacheCaseSensitive,
   +                cacheExpirationIntervalMs,
   +                cacheExpireAfterWriteIntervalMs)
                : catalog;
   ````
   </details>
   
   <details>
   <summary>E2E test — <code>TestCacheStalenessE2E.java</code></summary>
   
   ```java
   // (Spark v4.1) three catalogs over one warehouse: writer (no cache, 
out-of-band updater),
   // acc (expireAfterAccess=3s), wrt (expireAfterWrite=3s). Reader does SELECT 
count(*) every 300ms.
   @Test
   public void staleReadThroughSparkSql() throws Exception {
     spark.sql("CREATE NAMESPACE IF NOT EXISTS writer.db");
     spark.sql("CREATE TABLE writer.db.dim (id bigint, v string) USING 
iceberg");
     spark.sql("INSERT INTO writer.db.dim VALUES (1, 'v1')");
   
     assertThat(count("acc")).isEqualTo(1);   // warm caches
     assertThat(count("wrt")).isEqualTo(1);
   
     spark.sql("INSERT INTO writer.db.dim VALUES (2, 'v2')");   // out-of-band 
update
   
     long endNs = System.nanoTime() + WINDOW_MS * 1_000_000L;
     long accFlipMs = -1, wrtFlipMs = -1, startNs = System.nanoTime();
     while (System.nanoTime() < endNs) {
       long t = (System.nanoTime() - startNs) / 1_000_000L;
       if (accFlipMs < 0 && count("acc") == 2) accFlipMs = t;
       if (wrtFlipMs < 0 && count("wrt") == 2) wrtFlipMs = t;
       Thread.sleep(300);
     }
   
     spark.sql("REFRESH TABLE acc.db.dim");      // manual refresh clears the 
cache...
     assertThat(count("acc")).isEqualTo(2);      // ...so acc now sees the 
update
   
     assertThat(accFlipMs).as("expireAfterAccess stays stale under continuous 
reads").isEqualTo(-1);
     assertThat(wrtFlipMs).as("expireAfterWrite picks up the 
update").isGreaterThanOrEqualTo(0);
   }
   ```
   
   Catalog config: `acc` = `cache.expiration-interval-ms=3000`; `wrt` = 
`cache.expiration-interval-ms=30000` + 
`cache.expiration.expire-after-write-interval-ms=3000`. Window is configurable 
(ran 8 s / 60 s / 300 s).
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to