moomindani commented on PR #14440:
URL: https://github.com/apache/iceberg/pull/14440#issuecomment-4621071011
Thanks @gaborkaszab, that's a fair ask — I wanted to check it rather than
hand-wave, so I tried reproducing it end-to-end through Spark SQL and wanted to
share what I found.
Setup: three Iceberg catalogs over one Hadoop warehouse / one table —
`writer` (cache disabled, standing in for an ingestion job updating the table
out-of-band), `acc` (`cache.expiration-interval-ms=3000`, today's default), and
`wrt` (`expire-after-write-interval-ms=3000`, the proposed policy). A reader
runs `SELECT count(*)` every 300 ms; after `writer` commits an update:
```
acc (expireAfterAccess=3000ms): stale for all 841 reads over a 300s window —
didn't see the update; count=1
wrt (expireAfterWrite=3000ms): picked up the update after ~3.0s; count=2
```
What stood out to me is that it isn't a transient lag — because
`expireAfterAccess` resets on every read, a table read more often than the TTL
seems to stay on the cached snapshot indefinitely (I tried 8 s / 60 s / 300 s
and `acc` was stale the whole time in each). So the case I had in mind is a
long-lived driver — a SQL endpoint, or a streaming job enriching against a
dimension table that's continuously updated by an ingestion job — reading more
often than the 30 s default while another job updates the table.
On `refresh()` — it's not that it's unreachable: you can force a reload with
`REFRESH TABLE` (I confirmed it does clear the cache — `acc` flipped to the new
count right after), and an app holding a `Table` can call `refresh()` directly.
The issue is that it's a manual, per-statement action, and there are common
patterns where issuing it isn't practical — you can't inject it into a running
stream, and prefixing every statement in an automated `CTAS` / `INSERT …
SELECT` / `MERGE INTO` pipeline (where a stale source read silently produces
wrong output) isn't realistic. For those, the fallbacks today are disabling the
cache (the ~2,250x-slower path) or restarting — which is where an automatic
`expireAfterWrite` bound helps.
On the benchmark — agreed, the latency table is really about cost rather
than staleness; the freshness section and this e2e are the staleness side. I
included the latency mostly because turning the cache off is the workaround
people reach for, and it's good to know what that costs.
One thing I ran into: the write-TTL isn't reachable from Spark yet — the PR
adds it to `CachingCatalog`/`CatalogProperties`, but `SparkCatalog` still
passes only the access interval. I wired
`cache.expiration.expire-after-write-interval-ms` through `SparkCatalog` to run
this e2e (diff below). @blcksrx, might be worth folding into the PR (and
probably the Flink catalog too).
On dev@ — agreed that's the right venue for wider input. @blcksrx, since
it's your PR, I think it's best coming from you; the e2e below is yours to use
as supporting material, and please add or reframe anything you'd like. Thanks
again @gaborkaszab for the push — it was a good prompt to actually verify.
<details>
<summary><code>SparkCatalog</code> wiring for
<code>cache.expiration.expire-after-write-interval-ms</code></summary>
````diff
@@ SparkCatalog.initialize @@
long cacheExpirationIntervalMs =
PropertyUtil.propertyAsLong(
options,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
+ long cacheExpireAfterWriteIntervalMs =
+ PropertyUtil.propertyAsLong(
+ options,
+
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS,
+
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);
+
// An expiration interval of 0ms effectively disables caching.
// Do not wrap with CachingCatalog.
- if (cacheExpirationIntervalMs == 0) {
+ if (cacheExpirationIntervalMs == 0 && cacheExpireAfterWriteIntervalMs
== 0) {
cacheEnabled = false;
}
@@
this.icebergCatalog =
cacheEnabled
- ? CachingCatalog.wrap(catalog, cacheCaseSensitive,
cacheExpirationIntervalMs)
+ ? CachingCatalog.wrap(
+ catalog,
+ cacheCaseSensitive,
+ cacheExpirationIntervalMs,
+ cacheExpireAfterWriteIntervalMs)
: catalog;
````
</details>
<details>
<summary>E2E test — <code>TestCacheStalenessE2E.java</code></summary>
```java
// (Spark v4.1) three catalogs over one warehouse: writer (no cache,
out-of-band updater),
// acc (expireAfterAccess=3s), wrt (expireAfterWrite=3s). Reader does SELECT
count(*) every 300ms.
@Test
public void staleReadThroughSparkSql() throws Exception {
spark.sql("CREATE NAMESPACE IF NOT EXISTS writer.db");
spark.sql("CREATE TABLE writer.db.dim (id bigint, v string) USING
iceberg");
spark.sql("INSERT INTO writer.db.dim VALUES (1, 'v1')");
assertThat(count("acc")).isEqualTo(1); // warm caches
assertThat(count("wrt")).isEqualTo(1);
spark.sql("INSERT INTO writer.db.dim VALUES (2, 'v2')"); // out-of-band
update
long endNs = System.nanoTime() + WINDOW_MS * 1_000_000L;
long accFlipMs = -1, wrtFlipMs = -1, startNs = System.nanoTime();
while (System.nanoTime() < endNs) {
long t = (System.nanoTime() - startNs) / 1_000_000L;
if (accFlipMs < 0 && count("acc") == 2) accFlipMs = t;
if (wrtFlipMs < 0 && count("wrt") == 2) wrtFlipMs = t;
Thread.sleep(300);
}
spark.sql("REFRESH TABLE acc.db.dim"); // manual refresh clears the
cache...
assertThat(count("acc")).isEqualTo(2); // ...so acc now sees the
update
assertThat(accFlipMs).as("expireAfterAccess stays stale under continuous
reads").isEqualTo(-1);
assertThat(wrtFlipMs).as("expireAfterWrite picks up the
update").isGreaterThanOrEqualTo(0);
}
```
Catalog config: `acc` = `cache.expiration-interval-ms=3000`; `wrt` =
`cache.expiration-interval-ms=30000` +
`cache.expiration.expire-after-write-interval-ms=3000`. Window is configurable
(ran 8 s / 60 s / 300 s).
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]