[
https://issues.apache.org/jira/browse/CASSANDRA-21452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088327#comment-18088327
]
Jon Haddad commented on CASSANDRA-21452:
----------------------------------------
WIP branch:
[https://github.com/rustyrazorblade/cassandra/tree/cursor-compaction-completion]
Still working on this, but a bit is done and I should have the rest wrapped up
soon.
h2. The differential harness
{{DifferentialCompactionTester}} (test/unit/.../db/compaction/differential/)
compacts the same input sstables through both production pipelines and asserts
the outputs are byte-for-byte identical. Both runs go through the real
{{CompactionTask.execute()}} with only the pipeline selection flipped — no
hand-driven internals — so what is verified is exactly what production
executes. Inputs are bit-identical across the two runs: the first run keeps its
originals, outputs are identified by descriptor diff, and the live set is
restored from the original files between runs. Both runs receive the same
{{gcBefore}}, so purge decisions cannot differ by clock. Before the cursor run,
the harness asserts the same {{isSupported()}} check production uses — a
scenario that would silently fall back to the iterator fails loudly instead of
passing vacuously.
Comparison is layered: byte equality of every output component (any component
permitted to diverge needs an explicitly justified allowlist entry — none have
been needed), a canonical sstabledump walk and a stats-metadata summary beneath
it for debugging, and extended {{IVerifier}} verification of every output.
Every scenario then runs a *second generation*: the cursor's own output is
committed and re-compacted through both paths, so write-side corruption that
only the next merge can observe fails in the suite rather than in production's
next compaction. A separate set of allocation gates measures warmed compactions
at two data scales with {{ThreadMXBean.getThreadAllocatedBytes}} and fails if
allocation scales with rows, sparse rows, complex cells, markers, or input
bytes — the garbage-free property is enforced by measurement, with JFR
diagnostics alongside for attribution.
Usage: each suite is a normal junit class, e.g. {{ant testsome
-Dtest.name=org.apache.cassandra.db.compaction.differential.EdgeCaseDifferentialCompactionTest}}.
The randomized soak generates schemas/workloads filtered by the production
support matrix (it widens automatically as gaps close); every failure message
carries a seed, and {{-Dtest.jvm.args="-Dcassandra.test.differential.seed=N"}}
replays it as the first example. Scale scenarios take property knobs the same
way (row counts, sstable counts, value sizes, column counts) so arbitrary sizes
can be spun up without code changes. Writing a new scenario is: write CQL,
flush between rounds, call {{assertCursorMatchesIteratorAcrossGenerations(cfs,
ALLOWLIST)}}.
Coverage: tombstone interleavings (cell/row/range/partition/complex deletions,
open-ended and overlapping ranges, boundary markers, resurrection), TTL shapes
(row vs cell TTLs, expired-to-tombstone, TTLs inside collections), timestamp
ties (live/live, expiring/live, delete/write, ties inside complex columns),
row-liveness shapes (UPDATE-built rows, liveness-only rows), statics including
static complex columns and empty static rows, frozen and multi-cell collections
and UDTs, nested types, vectors and durations, >64-column subset encodings at
the wire-format mode boundary, materialized-view tables (strict liveness,
view-partition moves), partial-set compactions (purge-overlap against
non-participants), multi-output writer switching, exact purge boundaries,
index-block boundaries, compressors and direct I/O, Harry-driven probabilistic
histories, and scale: 2M rows across 20 input sstables, ~2,000-column tables,
and single partitions past the 2GiB intra-partition offset boundary — all of it
in both sstable formats.
h2. What was added
*Multi-cell collections and UDTs (read / merge / write).* Reading: the cell
loop gains a per-column remaining-cells counter and consumes the complex column
header (optional deletion + cell count) on column entry; cell paths are read
into a grow-only byte[] scratch instead of materializing {{CellPath}} objects.
Merging: a path-ordered N-way merge nested inside the existing column merge —
sources are path-sorted on disk, so equal paths meet at the heads and resolve
through the same cell-reconciliation rules as simple cells; the per-column
deletion is merged newest-wins, shadowed by any active range/partition deletion
before purge evaluation, and shadows cells at or below its timestamp. Path
comparison is type-aware (map key type, set element type, {{TimeUUID}} for
lists, unsigned-short field index for UDTs) over reusable buffer windows.
Writing is the part worth a close look: a complex column's cell count is only
known after the merge, and the row-level {{HAS_COMPLEX_DELETION}} flag is only
known after every column has merged — so cells stream into the row buffer as
they win, a small reusable marker array records each complex column's start
offset, cell count, and deletion, and {{writeRowEnd}} assembles the final cell
section, splicing {{[deletion][count]}} in at each marker and deciding the row
flag before any byte hits the data file. Rows without complex columns keep the
original direct path, enforced by byte identity.
*Each cell type, specifically.* Live regular cells were already supported.
Tombstone cells: flag building now mirrors {{Cell.Serializer}} exactly — strict
{{ttl != NO_TTL}} expiring semantics and mutually-exclusive deleted/expiring
flags (the original code emitted {{IS_DELETED|IS_EXPIRING}} plus a wasted TTL
byte on every tombstone cell). Expiring cells: expired-to-tombstone conversion
runs through a reusable liveness ({{ttlToTombstone()}}), with the same purge
decision the iterator reaches. Path-carrying cells (collection elements, UDT
fields): the path travels as raw bytes from reader scratch to writer output
with no intermediate object. Deletion-only complex columns (a deletion with
zero surviving cells) surface through a pause-position in the reader so the
merge can fold their deletions without fabricating cells. Counters are the one
remaining cell type and stay gated.
*BTI output.* The BIG-specific index logic was extracted behind an event-shaped
seam ({{CursorIndexWriter}}: partition start, row written, partition end) — the
BIG implementation is the original code moved verbatim, gated by a
pure-refactor byte-identity run. The BTI implementation feeds
{{RowIndexWriter}} and the partition index from the cursor's raw bytes. The
interesting problem is that the trie builders want {{ClusteringPrefix}} /
{{ByteComparable}} boundaries while the cursor holds raw serialized clustering
bytes: a reusable lazy view parses component boundaries out of the descriptor's
buffer on demand, and because the trie APIs retain references across calls,
block-boundary prefixes and partition keys are snapshotted at a bounded
per-index-block cost rather than handed the mutable reusables. Single-block
partitions reproduce the iterator's no-row-trie rule exactly.
*Wide schemas, views, scale.* The >64-column large-subset wire encoding now
matches {{Columns.Serializer}} in both encoding modes and at the exact
mode-selection boundary (see bug list). Materialized-view tables exercise
{{enforceStrictLiveness}} (already implemented in the cursor, now proven
identical). Verified at scale: a ~2.6GiB single partition — intra-partition
offsets past {{Integer.MAX_VALUE}}, ~660K index blocks in one promoted-index
entry / row trie — byte-identical in both formats.
*{{keepOriginals}} fix.* {{SSTableRewriter.moveStarts}} obsoleted fully-covered
originals even when the caller asked to keep them (only the bulk
{{obsoleteOriginals()}} honored the flag). One-line guard; it also lets every
differential run execute with early open enabled, making the whole corpus a
standing regression test for that path.
h2. Bugs found in the existing code
All of these predate this work; each is fixed on the branch with a pinning test
that fails without the fix. The byte-for-byte bar found every one of them.
*Data correctness.* Same-timestamp cell conflicts resolved with an inverted
value comparison — cursor compaction kept the lexically smaller value where
{{Cells.resolveRegular}} keeps the greater, so cursor- and iterator-compacted
replicas silently diverged on equal-timestamp writes; the TTL tie-break rule
was missing entirely. On DESC clustering columns, empty (zero-length)
clustering values sorted on the wrong side of valued ones — the raw comparison
decided absent-component order from flag bits without consulting
{{ReversedType}} — producing wrongly-ordered merge output within a partition
and wrong covered-clustering stats across partitions.
*Output corruption.* The >64-column subset encoder dropped every present column
after the last missing one (a loop bounded by the wrong variable) and disagreed
with the deserializer about encoding mode for odd superset sizes — cursor
compaction of wide sparse rows wrote sstables that fail with EOF on every
subsequent read. No pre-existing test exercised the present-index mode.
*Format divergence.* {{previousUnfilteredSize}} was written as a literal 0 for
every row and marker (dead on today's read path, but every cursor-compacted
file differed from the iterator's on every row). Tombstone cells carried a
spurious {{IS_EXPIRING}} flag plus an extra TTL byte each. The final
promoted-index block's width excluded the end-of-partition marker byte, and
partitions crossing {{column_index_size}} exactly once got no promoted index at
all — indexed reads of partitions in the 1-2x {{column_index_size}} band
(64-128KiB at the default) lost intra-partition seeking.
*Stats.* Partitions with no static values wrote an empty static row that
over-counted {{totalRows}}/{{totalColumnsSet}}.
{{ReusableDeletionTime.reset(long, long)}} classified {{NO_DELETION_TIME}} as
invalid instead of live, letting live complex-column marker deletions past the
stats collector's guard — cursor-compacted sstables with static complex columns
recorded {{minTimestamp=Long.MIN_VALUE}} and a bogus tombstone count.
*Allocation on the garbage-free path.* {{ClusteringPrefix.Kind.values()}} was
called once per row read and per marker written (every enum {{values()}} call
allocates); fixed with a shared cached array — this was the dominant scaling
allocation on the cursor path. Sparse rows (any row missing a column)
re-materialized a {{Columns}} object and the per-column type arrays per row
through the subset decode; replaced with a direct bitmask walk for supersets
under 64 columns.
*Lifecycle.* The {{SSTableRewriter.moveStarts}} / {{keepOriginals}} violation
described above.
> Complete cursor compaction coverage (complex columns, BTI, wide schemas),
> verified byte-for-byte against the iterator path
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: CASSANDRA-21452
> URL: https://issues.apache.org/jira/browse/CASSANDRA-21452
> Project: Apache Cassandra
> Issue Type: Improvement
> Components: Local/Compaction
> Reporter: Jon Haddad
> Assignee: Jon Haddad
> Priority: Normal
>
> h2. Background
> CASSANDRA-20918 introduced cursor-based compaction ({{CursorCompactor}},
> {{SSTableCursorReader}}, {{SSTableCursorWriter}}): a streaming merge that
> avoids the per-partition/row/cell object materialization of the iterator path
> ({{CompactionIterator}}). Reported results: 2–5x faster compaction, up to
> ~100x allocation reduction, with a fixed memory footprint per sstable.
> The motivating problem is CASSANDRA-20428: {{ByteArrayAccessor.read}} — the
> {{new byte[length]}} per value performed by cell and clustering
> deserialization — is a dominant allocation source wherever the iterator-based
> read machinery runs, and compaction pays it for every cell of every sstable
> it merges. The cursor path eliminates that allocation class entirely, but
> only for the schemas and configurations it supports; everything else silently
> falls back to the iterator path at {{AbstractCompactionPipeline.create}}.
> h2. What was missing
> As merged, {{CursorCompactor.isSupported()}} / {{unsupportedMetadata()}}
> reject most real-world schemas and configurations, so the allocation and
> throughput wins do not apply where they matter most:
> # *Multi-cell columns* — non-frozen collections and UDTs are unsupported. Any
> table with a {{map}}, {{set}}, {{list}}, or non-frozen UDT column falls back.
> # *BTI output format* — the cursor only writes the BIG format. Clusters on
> the trie format (the trunk default direction) never use the cursor path.
> # *Partial-range scanners* — token-subrange compactions fall back.
> # *Counters* — counter tables fall back.
> # *Garbage-skipping modes* — {{tombstoneOption != NONE}} falls back (default
> is NONE, so this is lower priority).
> Out of scope, by design: secondary indexes / SAI (index observers consume
> materialized rows and need their own design) and pre-current sstable versions
> (compaction rewrites to the current version, so the gap phases out on its
> own).
> Equally missing was *verification*: the existing cursor tests asserted each
> path's output against expected CQL results independently, but never compared
> the two paths against each other, and never at the file level. Two compaction
> implementations that must be interchangeable at scale need a stronger bar
> than "both look right in isolation" — divergence between cursor- and
> iterator-compacted replicas is silent and permanent.
> h2. What needs to be implemented
> Close the support gaps incrementally — complex columns, then BTI output, then
> partial-range scanners, then counters, with the garbage-skipper equivalent
> optional — with each increment landing only when it passes the verification
> bar below. Wide schemas (>64-column subset encodings), materialized-view
> tables (strict liveness), and multi-gigabyte partitions are part of the
> supported surface and must be covered, not special-cased away.
> Two standing constraints:
> * *Garbage-free is the point of the feature.* Every change must preserve the
> no-per-element-allocation property of the hot path, enforced by measurement
> rather than code review alone.
> * *Behavior must be identical, not merely equivalent.* Purge decisions,
> tie-breaks, stats, index structure — everything.
> h2. Required: byte-for-byte differential verification
> The two compaction implementations must be interchangeable, and the way to
> prove that is a test harness that compacts the same inputs through both
> production pipelines and compares the outputs *byte for byte* — every
> component of every output sstable, in both sstable formats. Any component
> permitted to diverge needs an explicit, justified exception; the expectation
> is that none are needed. (Experience so far supports the bar: byte
> divergences found this way have been bugs, not justified differences.)
> Coverage should span the supported surface — the tombstone, TTL, and
> timestamp edge cases where merge implementations classically disagree,
> randomized schemas and workloads, and large-scale shapes (wide schemas,
> many-sstable merges, multi-gigabyte partitions) — and should grow with each
> increment so a closed gap can never silently fall back or regress. The
> harness design beyond that is an implementation detail of the patch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]