moomindani commented on PR #14440:
URL: https://github.com/apache/iceberg/pull/14440#issuecomment-4561571918

   @blcksrx — thanks for the earlier work here. The PR was auto-closed by the 
stale bot in March, but the committer feedback on the design was 
directional-positive 
([@gaborkaszab](https://github.com/apache/iceberg/pull/14440#issuecomment-3516831276),
 
[@findepi](https://github.com/apache/iceberg/pull/14440#issuecomment-3518104873),
 
[@pvary](https://github.com/apache/iceberg/pull/14440#issuecomment-3596157493)).
 I think the main thing missing was a concrete quantification of the impact — 
proposals without numbers are harder to prioritize. I cherry-picked your commit 
onto current `main` and ran a benchmark; sharing it here in case it's useful 
for reviving the PR.
   
   Happy to help drive this — see the "Path forward" section at the bottom.
   
   ---
   
   ## Bench setup
   
   | Item | Value |
   |---|---|
   | Iceberg base | `upstream/main` @ `d2cde2950` |
   | Cherry-pick | `857f17879` (this PR) |
   | Catalog | `HadoopCatalog`, local temp warehouse (no S3) |
   | Hardware | Apple Silicon (arm64), macOS 25.4 |
   | JDK | OpenJDK 17.0.18 (Homebrew) |
   | Gradle | 8.14.5 |
   | Caffeine | 2.9.3 (unchanged) |
   | Per cell | 5 iter x 100,000 `loadTable()` calls + 5,000 latency samples |
   | Freshness window | write-TTL = 200ms, 300ms of continuous reads after 
underlying commit |
   
   ## Reproduction
   
   ```bash
   git checkout upstream/main      # d2cde2950 at time of measurement
   git fetch upstream pull/14440/head:pr-14440
   git cherry-pick 857f17879
   # Resolve trivial conflicts in docs/tests/spark-v4.1 (not on cache path)
   # Add CachePolicyBench.java (full source in the collapsed section below)
   ./gradlew :iceberg-core:test --tests CachePolicyBench --rerun-tasks -i
   ```
   
   ## Results
   
   ### Per-call latency / throughput
   
   | policy | mean `loadTable()` | p50 | p99 | throughput |
   |---|---:|---:|---:|---:|
   | access-only (current default) | **0.07 us** | 0.04 us | 0.83 us | 13,721 
k/s |
   | cache-disabled (workaround) | **905.83 us** | 868.13 us | 1,338.25 us | 
1.1 k/s |
   | write-only (this PR, TTL=1h) | **0.44 us** | 0.42 us | 1.29 us | 2,248 k/s 
|
   
   - The cache-disabled workaround is **~2,250x slower** than the proposed 
write-only path
   - The write-only path adds only ~0.37 us absolute overhead vs. access-only — 
essentially free for the hit path
   
   ### Freshness check
   
   After committing a new snapshot via another writer, then performing 
continuous `loadTable()` calls for 300ms (longer than the configured 200ms 
write TTL):
   
   | policy | reads in 300ms | observed snapshot |
   |---|---:|---|
   | access-only (TTL=200ms) | 5,835,503 | **stale** — TTL never fires while 
reads continue |
   | write-only (TTL=200ms) | 800,529 | **refreshed** to new snapshot despite 
continuous reads |
   | cache-disabled | 349 | refreshed, but at 906 us/call |
   
   The `access-only` result reproduces precisely the staleness bug @Tommo56700 
reported in #14417.
   
   ### Streaming workload projection
   
   For a 1 Hz microbatch streaming job over 24 h, reading one cached dimension 
table:
   
   | policy | metadata overhead / day |
   |---|---:|
   | cache-disabled (current workaround) | 78.3 seconds |
   | write-only (this PR, TTL=1h) | 0.060 seconds |
   | **ratio** | **~1,301x cheaper** |
   
   On real cloud storage (S3/GCS), `loadTable()` cold cost is typically 50-200 
ms vs. ~1 ms locally. The absolute saving for production scales by that factor 
— order of **hours per day per cached table**.
   
   ## Caveats
   
   - HadoopCatalog on local FS underestimates absolute latency vs. S3/GCS by 
~50-200x. The ratios still hold.
   - This bench only exercises `CachingCatalog`. `SparkExecutorCache` (which 
@Tommo56700 asked about in the conversation above) was not measured here — 
Tommo's separate ask still applies, possibly as a follow-up PR.
   - Microbenchmark; real Spark Structured Streaming workloads will have 
additional overhead from task scheduling, planning, etc. The metadata-fetch 
overhead measured here is independent of those.
   
   ## Path forward
   
   A few options, in order of preference:
   
   1. **Revive this PR.** I'd be happy to help review subsequent revisions and 
repost the benchmark on the dev@ thread (the previous one apparently didn't 
reach all inboxes — see [@nastra's 
comment](https://github.com/apache/iceberg/pull/14440#issuecomment-3576285625) 
above). @blcksrx — if you have bandwidth to push another revision, the design 
already has committer buy-in.
   2. **Successor PR with attribution.** If you've moved on, I'm willing to 
open a successor PR (co-authored from your commit `857f17879`) carrying the 
same design forward, with clear attribution in the description.
   3. **Different design proposal.** If on reflection a different approach is 
preferred (e.g., per-table override, pluggable `Cache` factory as suggested in 
[this 
comment](https://github.com/apache/iceberg/issues/14417#issuecomment-3451805984)),
 the empirical case for *some* solution is still strong — happy to align.
   
   Let me know which path works for you. I'll also re-post these numbers to 
dev@ to try to revive the mailing list discussion.
   
   cc @Tommo56700 (original reporter), @gaborkaszab @findepi @pvary @nastra 
(previously engaged).
   
   <details>
   <summary>Full bench source (CachePolicyBench.java)</summary>
   
   ```java
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *   http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
   package org.apache.iceberg;
   
   import static org.apache.iceberg.types.Types.NestedField.required;
   
   import java.io.IOException;
   import java.nio.file.Path;
   import java.util.Arrays;
   import java.util.Locale;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.catalog.Catalog;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.hadoop.HadoopCatalog;
   import org.apache.iceberg.types.Types;
   import org.junit.jupiter.api.Test;
   import org.junit.jupiter.api.io.TempDir;
   
   /**
    * Empirical quantification of CachingCatalog expiration policy choices for 
issue #14417 /
    * PR #14440 (Hossein Torabi, blcksrx).
    */
   public class CachePolicyBench {
   
     private static final Schema SCHEMA = new Schema(required(1, "id", 
Types.LongType.get()));
     private static final TableIdentifier TABLE = TableIdentifier.of("ns", 
"ref_table");
   
     private static final int LOOP_ITERATIONS = 100_000;
     private static final int LATENCY_SAMPLES = 5_000;
     private static final int OUTER_ITER = 5;
     private static final long ACCESS_TTL_MS = 30_000;
     private static final long WRITE_TTL_MS = 3_600_000; // 1h
     private static final long DAY_SECS = 86_400;
   
     private static final long FRESHNESS_WRITE_TTL_MS = 200;
     private static final long FRESHNESS_SLEEP_MS = 300;
   
     @TempDir Path tempDir;
   
     @Test
     public void benchmark() throws Exception {
       HadoopCatalog hadoopCatalog = new HadoopCatalog();
       hadoopCatalog.setConf(new Configuration());
       hadoopCatalog.initialize("local", java.util.Map.of("warehouse", 
tempDir.toString()));
       
hadoopCatalog.createNamespace(org.apache.iceberg.catalog.Namespace.of("ns"));
       Table fresh = hadoopCatalog.createTable(TABLE, SCHEMA);
       appendOneFile(fresh);
   
       Catalog accessOnly = CachingCatalog.wrap(hadoopCatalog, ACCESS_TTL_MS, 
-1);
       Catalog writeOnly = CachingCatalog.wrap(hadoopCatalog, -1, WRITE_TTL_MS);
       Catalog raw = hadoopCatalog;
   
       System.out.println();
       System.out.println("=== Cache policy benchmark for #14417 / #14440 ===");
       System.out.println(
           "Local HadoopCatalog, " + OUTER_ITER + " iterations of " + 
LOOP_ITERATIONS
               + " loadTable() calls per cell");
       System.out.println();
       System.out.println(
           "| policy | mean loadTable (us) | p50 (us) | p99 (us) | throughput 
(k/s) |");
       System.out.println("|---|---:|---:|---:|---:|");
   
       double meanAccess = runAndReport("access-only (current default)", 
accessOnly);
       double meanDisabled = runAndReport("cache-disabled (workaround)", raw);
       double meanWriteOnly = runAndReport("write-only (proposed for 
streaming)", writeOnly);
   
       System.out.println();
       System.out.println(
           "## Freshness check (write-TTL=" + FRESHNESS_WRITE_TTL_MS + "ms, 
sleep="
               + FRESHNESS_SLEEP_MS + "ms between commit and re-read)");
       System.out.println();
       Catalog accessOnlyShort = CachingCatalog.wrap(hadoopCatalog, 
FRESHNESS_WRITE_TTL_MS, -1);
       Catalog writeOnlyShort = CachingCatalog.wrap(hadoopCatalog, -1, 
FRESHNESS_WRITE_TTL_MS);
       runFreshness("access-only (TTL=" + FRESHNESS_WRITE_TTL_MS + "ms)", 
accessOnlyShort, hadoopCatalog);
       runFreshness("write-only  (TTL=" + FRESHNESS_WRITE_TTL_MS + "ms)", 
writeOnlyShort, hadoopCatalog);
       runFreshness("cache-disabled               ", raw, hadoopCatalog);
   
       System.out.println();
       printStreamingProjection(meanAccess, meanDisabled, meanWriteOnly);
     }
   
     private void appendOneFile(Table table) {
       table
           .newAppend()
           .appendFile(
               org.apache.iceberg.DataFiles.builder(table.spec())
                   .withPath(tempDir.resolve("synthetic-" + System.nanoTime() + 
".parquet").toString())
                   .withFileSizeInBytes(10)
                   .withRecordCount(1)
                   .withFormat(FileFormat.PARQUET)
                   .build())
           .commit();
     }
   
     private double runAndReport(String label, Catalog catalog) {
       for (int i = 0; i < 5_000; i++) {
         catalog.loadTable(TABLE);
       }
       System.gc();
   
       double[] meansUs = new double[OUTER_ITER];
       long[] allSamples = new long[OUTER_ITER * LATENCY_SAMPLES];
       int samIdx = 0;
       for (int iter = 0; iter < OUTER_ITER; iter++) {
         long t0 = System.nanoTime();
         for (int i = 0; i < LOOP_ITERATIONS; i++) {
           catalog.loadTable(TABLE);
         }
         long elapsed = System.nanoTime() - t0;
         meansUs[iter] = (elapsed / 1000.0) / LOOP_ITERATIONS;
   
         for (int i = 0; i < LATENCY_SAMPLES; i++) {
           long s = System.nanoTime();
           catalog.loadTable(TABLE);
           allSamples[samIdx++] = System.nanoTime() - s;
         }
       }
       Arrays.sort(allSamples);
       double meanUs = Arrays.stream(meansUs).average().getAsDouble();
       long p50 = allSamples[allSamples.length / 2];
       long p99 = allSamples[(int) (allSamples.length * 0.99)];
       double throughputK = 1_000.0 / meanUs;
   
       System.out.printf(
           Locale.ROOT,
           "| %s | %.2f | %.2f | %.2f | %.0f |%n",
           label, meanUs, p50 / 1000.0, p99 / 1000.0, throughputK);
       return meanUs;
     }
   
     private void runFreshness(String label, Catalog catalog, HadoopCatalog 
underlying)
         throws IOException, InterruptedException {
       Table t1 = catalog.loadTable(TABLE);
       long initialSnap = t1.currentSnapshot() == null ? -1 : 
t1.currentSnapshot().snapshotId();
   
       Table direct = underlying.loadTable(TABLE);
       appendOneFile(direct);
       long expected = 
underlying.loadTable(TABLE).currentSnapshot().snapshotId();
   
       long endNs = System.nanoTime() + FRESHNESS_SLEEP_MS * 1_000_000L;
       long reads = 0;
       while (System.nanoTime() < endNs) {
         catalog.loadTable(TABLE);
         reads++;
       }
       Table t2 = catalog.loadTable(TABLE);
       long observed = t2.currentSnapshot() == null ? -1 : 
t2.currentSnapshot().snapshotId();
   
       String result =
           observed == expected
               ? "refreshed to new snapshot " + observed
               : "STALE: still returning " + observed + " (underlying is " + 
expected + ")";
       System.out.printf(
           "- %s: initial=%d, expected=%d after %d reads over %dms -> %s%n",
           label, initialSnap, expected, reads, FRESHNESS_SLEEP_MS, result);
     }
   
     private void printStreamingProjection(double accessUs, double disabledUs, 
double writeUs) {
       double disabledSecPerDay = DAY_SECS * (disabledUs / 1_000_000.0);
       double writeOnlyReloadsPerDay = DAY_SECS / (WRITE_TTL_MS / 1000.0);
       double writeOnlyReloadCost = disabledUs;
       double writeSecPerDay =
           (DAY_SECS - writeOnlyReloadsPerDay) * (writeUs / 1_000_000.0)
               + writeOnlyReloadsPerDay * (writeOnlyReloadCost / 1_000_000.0);
       double ratio = disabledSecPerDay / Math.max(writeSecPerDay, 1e-9);
       System.out.println("## Streaming workload projection");
       System.out.printf(
           Locale.ROOT,
           "1 Hz microbatch over 24h, local HadoopCatalog (S3/GCS will scale 
absolute up but ratio holds):%n");
       System.out.printf(
           Locale.ROOT,
           "- cache-disabled (workaround): %d calls x %.2fus ~ %.2f sec/day of 
metadata overhead%n",
           DAY_SECS, disabledUs, disabledSecPerDay);
       System.out.printf(
           Locale.ROOT,
           "- write-only (TTL=%dh): %.0f reloads x %.2fus + %.0f hits x %.2fus 
~ %.4f sec/day%n",
           WRITE_TTL_MS / 3_600_000,
           writeOnlyReloadsPerDay,
           writeOnlyReloadCost,
           DAY_SECS - writeOnlyReloadsPerDay,
           writeUs,
           writeSecPerDay);
       System.out.printf(Locale.ROOT, "- write-only is ~%.0fx cheaper than the 
workaround%n", ratio);
       System.out.printf(
           Locale.ROOT,
           "- on real cloud storage (S3 cold ~100ms), the per-reload cost is 
~%.0fx the local value%n",
           100_000.0 / disabledUs);
     }
   }
   ```
   
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to