[ 
https://issues.apache.org/jira/browse/CASSANDRA-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159370#comment-17159370
 ] 

Andres de la Peña commented on CASSANDRA-15907:
-----------------------------------------------

[~maedhroz] I couldn't resist giving a try to the making the per-row lazy 
pre-fetch, I have left it 
[here|https://github.com/adelapena/cassandra/commit/accf2a47c341875942b0d8b06c016cc0d66d62cb].

Instead of consuming all the contents of each merged partition, it consumes 
them row-per-row until there are replica contents. That way, if there are no 
conflicts, it only caches one row per replica, instead of an entire partition 
per replica. Also, iif it finds that there are rows to fetch from the replica, 
it advances the first phase merged row iterator a bit more until reaching a 
certain cache size, trying to find a balance between the cache size and the 
number of RFP queries. Right now that desired cache size is hardcoded to 100, 
but we could use a config property, or the query limit, for example. Also we 
could also just let it unbounded to minimize the number of RFP queries, the 
main advantage of this approach is that in the absence of conflicts nothing 
needs to be cached. That should benefit the most common case, which is when 
there are no conflicts.

To illustrate how the per-row approach behaves, let's see this example:
{code:python}
self._prepare_cluster(
    create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY(k, c))",
    create_index="CREATE INDEX ON t(v)",
    both_nodes=["INSERT INTO t (k, c, v) VALUES (0, 0, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 1, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 2, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 3, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 4, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 5, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 6, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 7, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 8, 'old')",
                "INSERT INTO t (k, c, v) VALUES (0, 9, 'old')"],
    only_node1=["INSERT INTO t (k, c, v) VALUES (0, 4, 'new')",
                "INSERT INTO t (k, c, v) VALUES (0, 6, 'new')"])
self._assert_all("SELECT c FROM t WHERE v = 'old'", rows=[[0], [1], [2], [3], 
[5], [7], [8], [9]])
{code}
Without the per-row approach we cached 20 rows (10 per replica) and issued one 
single RFP query. In contrast, with the per-row approach:
 * If the target cache size is very high or unbounded, we will cache a max of 
12 rows and we will need 1 RFP queries.
 * If the target cache size is 2, we will cache a max of 8 rows and we will 
need 1 RFP queries.
 * If we use a target cache size of 1, we will cache a max of 6 rows but, 
differently from before, we will need 2 separate RFP queries.
 * If there are no conflicts we will only have one cached row per replica; the 
current one.

Note that consuming rows from the fist phase iterator to populate the cache can 
still produce an unlimited growth of the cache, so we still need the guardrail. 
The (perhaps) configurable target cache size that I mention is only used to try 
to find a balance between cache size and grouping of primary keys to fetch.

> Operational Improvements & Hardening for Replica Filtering Protection
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-15907
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-15907
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Consistency/Coordination, Feature/2i Index
>            Reporter: Caleb Rackliffe
>            Assignee: Caleb Rackliffe
>            Priority: Normal
>              Labels: 2i, memory
>             Fix For: 3.0.x, 3.11.x, 4.0-beta
>
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> CASSANDRA-8272 uses additional space on the heap to ensure correctness for 2i 
> and filtering queries at consistency levels above ONE/LOCAL_ONE. There are a 
> few things we should follow up on, however, to make life a bit easier for 
> operators and generally de-risk usage:
> (Note: Line numbers are based on {{trunk}} as of 
> {{3cfe3c9f0dcf8ca8b25ad111800a21725bf152cb}}.)
> *Minor Optimizations*
> * {{ReplicaFilteringProtection:114}} - Given we size them up-front, we may be 
> able to use simple arrays instead of lists for {{rowsToFetch}} and 
> {{originalPartitions}}. Alternatively (or also), we may be able to null out 
> references in these two collections more aggressively. (ex. Using 
> {{ArrayList#set()}} instead of {{get()}} in {{queryProtectedPartitions()}}, 
> assuming we pass {{toFetch}} as an argument to {{querySourceOnKey()}}.)
> * {{ReplicaFilteringProtection:323}} - We may be able to use 
> {{EncodingStats.merge()}} and remove the custom {{stats()}} method.
> * {{DataResolver:111 & 228}} - Cache an instance of 
> {{UnaryOperator#identity()}} instead of creating one on the fly.
> * {{ReplicaFilteringProtection:217}} - We may be able to scatter/gather 
> rather than serially querying every row that needs to be completed. This 
> isn't a clear win perhaps, given it targets the latency of single queries and 
> adds some complexity. (Certainly a decent candidate to kick even out of this 
> issue.)
> *Documentation and Intelligibility*
> * There are a few places (CHANGES.txt, tracing output in 
> {{ReplicaFilteringProtection}}, etc.) where we mention "replica-side 
> filtering protection" (which makes it seem like the coordinator doesn't 
> filter) rather than "replica filtering protection" (which sounds more like 
> what we actually do, which is protect ourselves against incorrect replica 
> filtering results). It's a minor fix, but would avoid confusion.
> * The method call chain in {{DataResolver}} might be a bit simpler if we put 
> the {{repairedDataTracker}} in {{ResolveContext}}.
> *Testing*
> * I want to bite the bullet and get some basic tests for RFP (including any 
> guardrails we might add here) onto the in-JVM dtest framework.
> *Guardrails*
> * As it stands, we don't have a way to enforce an upper bound on the memory 
> usage of {{ReplicaFilteringProtection}} which caches row responses from the 
> first round of requests. (Remember, these are later used to merged with the 
> second round of results to complete the data for filtering.) Operators will 
> likely need a way to protect themselves, i.e. simply fail queries if they hit 
> a particular threshold rather than GC nodes into oblivion. (Having control 
> over limits and page sizes doesn't quite get us there, because stale results 
> _expand_ the number of incomplete results we must cache.) The fun question is 
> how we do this, with the primary axes being scope (per-query, global, etc.) 
> and granularity (per-partition, per-row, per-cell, actual heap usage, etc.). 
> My starting disposition   on the right trade-off between 
> performance/complexity and accuracy is having something along the lines of 
> cached rows per query. Prior art suggests this probably makes sense alongside 
> things like {{tombstone_failure_threshold}} in {{cassandra.yaml}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to