[ 
https://issues.apache.org/jira/browse/IGNITE-28750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Valuyskiy updated IGNITE-28750:
------------------------------------
    Description: 
A local Continuous Query created with *ContinuousQuery#setLocal(true)* may fail 
with *AssertionError* during transactional rollback after a node failure.

During rollback, *IgniteTxHandler#applyPartitionsUpdatesCounters* logs rollback 
ranges and notifies the Continuous Query subsystem about skipped update 
counters:

 
{code:java}
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr, 
topVer, rollbackOnPrimary);{code}
 

This call enters {*}CacheContinuousQueryManager#skipUpdateCounter{*}:

 
{code:java}
@Nullable public CounterSkipContext skipUpdateCounter(
    @Nullable CounterSkipContext skipCtx,
    int part,
    long cntr,
    AffinityTopologyVersion topVer,
    boolean primary
) {
    for (CacheContinuousQueryListener lsnr : lsnrs.values()) 
        skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, 
primary);    
    return skipCtx;
}
{code}
 

Currently this method invokes *skipUpdateCounter* on every Continuous Query 
listener registered for the cache. There is no filtering by local-only mode, 
primary-only mode, or listener type.

This is problematic for local Continuous Queries: for such queries the 
*CacheContinuousQueryHandler* is registered only on the node where the query 
was started. In this case the handler has the following state:

 
{code:java}
loc == true
locOnly == true{code}
 

where *loc* means that the current node is the node that initiated the 
Continuous Query:

 
{code:java}
final boolean loc = nodeId.equals(ctx.localNodeId());{code}
 

and *locOnly* means that the query was created with 
{*}ContinuousQuery#setLocal(true){*}.

This combination is valid for a local Continuous Query. However, 
*CacheContinuousQueryHandler.CacheContinuousQueryListener#skipUpdateCounter* 
currently rejects this state:

 
{code:java}
if (loc) {
    assert !locOnly;
    final Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
        handleEvent(ctx, skipCtx.entry());
    ...
}
{code}
 

As a result, when transaction rollback counter cleanup calls 
*skipUpdateCounter* for a cache that has a local CQ listener, the local-only 
listener may reach this unsupported path and fail with {*}AssertionError{*}.

The important point is that skipped update counters are part of the distributed 
Continuous Query recovery/order mechanism. For regular distributed Continuous 
Queries, skipped counters are needed to close gaps in per-partition update 
counter sequences. A skipped counter is represented as a synthetic 
*CacheContinuousQueryEntry* marked as filtered:

 
{code:java}
entry = new CacheContinuousQueryEntry(..., part, cntr, topVer, ...);
entry.markFiltered();{code}
 

This synthetic entry is passed through the same recovery path as ordinary CQ 
entries. It allows *CacheContinuousQueryPartitionRecovery* to advance the 
counter sequence and release pending events that were waiting for a missing 
counter.

For example:

 
{code:java}
counter 10 -> real event
counter 11 -> rolled back / filtered / skipped
counter 12 -> real event{code}
 

Without a skip marker for counter 11, the recovery logic may not be able to 
safely release counter 12. Therefore, distributed CQ handlers process skipped 
counters through *handleEvent(...)* / partition recovery.

For local-only Continuous Queries, regular update events *are NOT* delivered 
through the distributed partition recovery path. In 
{*}CacheContinuousQueryHandler#onEntryUpdated{*}, local-only events are 
delivered directly to the local listener:

 
{code:java}
if (loc) {
    if (!locOnly) {
        Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
            handleEvent(ctx, entry);
        notifyLocalListener(evts, trans);
        ...
    }
    else if (!entry.isFiltered())
        notifyLocalListener(F.asList(evt), trans);
}
{code}
 

So for locOnly == true, regular user events do not go through 
{*}handleEvent(...){*}, {*}CacheContinuousQueryPartitionRecovery{*}, or the 
skipped-counter recovery mechanism. They are delivered directly to the local 
listener.

Because of that, local-only CQ listeners should not participate in 
*skipUpdateCounter* processing. A skipped counter is not a user-visible event, 
and for local-only CQ there is no distributed pending-event chain that needs to 
be advanced by a synthetic filtered entry.

The current implementation violates this invariant because 
CacheContinuousQueryManager#skipUpdateCounter blindly calls every registered 
listener, including local-only listeners.

The fix is to explicitly identify local-only CQ listeners and skip them during 
skipped-counter processing.

It is important NOT to use *CacheContinuousQueryListener#isPrimaryOnly()* for 
this filtering. *isPrimaryOnly()* is not equivalent to local-only. It is 
derived as:
{code:java}
return locOnly && !skipPrimaryCheck;{code}
For PARTITIONED caches with local CQ:
{code:java}
locOnly == true
skipPrimaryCheck == false
isPrimaryOnly() == true{code}
So *isPrimaryOnly()* would accidentally work for this case.

However, for REPLICATED caches local CQ uses {*}skipPrimaryCheck{*}:
{code:java}
boolean skipPrimaryCheck =
    loc &&
    cctx.config().getCacheMode() == CacheMode.REPLICATED &&
    cctx.affinityNode();{code}
In this case:
{code:java}
locOnly == true
skipPrimaryCheck == true
isPrimaryOnly() == false{code}
The query is still local-only, but it is not primary-only. Therefore filtering 
by *isPrimaryOnly()* would not exclude local CQ listeners on replicated caches 
and the same invalid *loc == true && locOnly == true* *skipUpdateCounter* path 
could still be reached.

The proper condition is local-only mode itself, not primary-only mode.

> AssertionError in local continuous query during rollback update counter skip
> ----------------------------------------------------------------------------
>
>                 Key: IGNITE-28750
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28750
>             Project: Ignite
>          Issue Type: Task
>            Reporter: Oleg Valuyskiy
>            Assignee: Oleg Valuyskiy
>            Priority: Major
>              Labels: ise
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> A local Continuous Query created with *ContinuousQuery#setLocal(true)* may 
> fail with *AssertionError* during transactional rollback after a node failure.
> During rollback, *IgniteTxHandler#applyPartitionsUpdatesCounters* logs 
> rollback ranges and notifies the Continuous Query subsystem about skipped 
> update counters:
>  
> {code:java}
> ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr, 
> topVer, rollbackOnPrimary);{code}
>  
> This call enters {*}CacheContinuousQueryManager#skipUpdateCounter{*}:
>  
> {code:java}
> @Nullable public CounterSkipContext skipUpdateCounter(
>     @Nullable CounterSkipContext skipCtx,
>     int part,
>     long cntr,
>     AffinityTopologyVersion topVer,
>     boolean primary
> ) {
>     for (CacheContinuousQueryListener lsnr : lsnrs.values()) 
>         skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, 
> primary);    
>     return skipCtx;
> }
> {code}
>  
> Currently this method invokes *skipUpdateCounter* on every Continuous Query 
> listener registered for the cache. There is no filtering by local-only mode, 
> primary-only mode, or listener type.
> This is problematic for local Continuous Queries: for such queries the 
> *CacheContinuousQueryHandler* is registered only on the node where the query 
> was started. In this case the handler has the following state:
>  
> {code:java}
> loc == true
> locOnly == true{code}
>  
> where *loc* means that the current node is the node that initiated the 
> Continuous Query:
>  
> {code:java}
> final boolean loc = nodeId.equals(ctx.localNodeId());{code}
>  
> and *locOnly* means that the query was created with 
> {*}ContinuousQuery#setLocal(true){*}.
> This combination is valid for a local Continuous Query. However, 
> *CacheContinuousQueryHandler.CacheContinuousQueryListener#skipUpdateCounter* 
> currently rejects this state:
>  
> {code:java}
> if (loc) {
>     assert !locOnly;
>     final Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
>         handleEvent(ctx, skipCtx.entry());
>     ...
> }
> {code}
>  
> As a result, when transaction rollback counter cleanup calls 
> *skipUpdateCounter* for a cache that has a local CQ listener, the local-only 
> listener may reach this unsupported path and fail with {*}AssertionError{*}.
> The important point is that skipped update counters are part of the 
> distributed Continuous Query recovery/order mechanism. For regular 
> distributed Continuous Queries, skipped counters are needed to close gaps in 
> per-partition update counter sequences. A skipped counter is represented as a 
> synthetic *CacheContinuousQueryEntry* marked as filtered:
>  
> {code:java}
> entry = new CacheContinuousQueryEntry(..., part, cntr, topVer, ...);
> entry.markFiltered();{code}
>  
> This synthetic entry is passed through the same recovery path as ordinary CQ 
> entries. It allows *CacheContinuousQueryPartitionRecovery* to advance the 
> counter sequence and release pending events that were waiting for a missing 
> counter.
> For example:
>  
> {code:java}
> counter 10 -> real event
> counter 11 -> rolled back / filtered / skipped
> counter 12 -> real event{code}
>  
> Without a skip marker for counter 11, the recovery logic may not be able to 
> safely release counter 12. Therefore, distributed CQ handlers process skipped 
> counters through *handleEvent(...)* / partition recovery.
> For local-only Continuous Queries, regular update events *are NOT* delivered 
> through the distributed partition recovery path. In 
> {*}CacheContinuousQueryHandler#onEntryUpdated{*}, local-only events are 
> delivered directly to the local listener:
>  
> {code:java}
> if (loc) {
>     if (!locOnly) {
>         Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
>             handleEvent(ctx, entry);
>         notifyLocalListener(evts, trans);
>         ...
>     }
>     else if (!entry.isFiltered())
>         notifyLocalListener(F.asList(evt), trans);
> }
> {code}
>  
> So for locOnly == true, regular user events do not go through 
> {*}handleEvent(...){*}, {*}CacheContinuousQueryPartitionRecovery{*}, or the 
> skipped-counter recovery mechanism. They are delivered directly to the local 
> listener.
> Because of that, local-only CQ listeners should not participate in 
> *skipUpdateCounter* processing. A skipped counter is not a user-visible 
> event, and for local-only CQ there is no distributed pending-event chain that 
> needs to be advanced by a synthetic filtered entry.
> The current implementation violates this invariant because 
> CacheContinuousQueryManager#skipUpdateCounter blindly calls every registered 
> listener, including local-only listeners.
> The fix is to explicitly identify local-only CQ listeners and skip them 
> during skipped-counter processing.
> It is important NOT to use *CacheContinuousQueryListener#isPrimaryOnly()* for 
> this filtering. *isPrimaryOnly()* is not equivalent to local-only. It is 
> derived as:
> {code:java}
> return locOnly && !skipPrimaryCheck;{code}
> For PARTITIONED caches with local CQ:
> {code:java}
> locOnly == true
> skipPrimaryCheck == false
> isPrimaryOnly() == true{code}
> So *isPrimaryOnly()* would accidentally work for this case.
> However, for REPLICATED caches local CQ uses {*}skipPrimaryCheck{*}:
> {code:java}
> boolean skipPrimaryCheck =
>     loc &&
>     cctx.config().getCacheMode() == CacheMode.REPLICATED &&
>     cctx.affinityNode();{code}
> In this case:
> {code:java}
> locOnly == true
> skipPrimaryCheck == true
> isPrimaryOnly() == false{code}
> The query is still local-only, but it is not primary-only. Therefore 
> filtering by *isPrimaryOnly()* would not exclude local CQ listeners on 
> replicated caches and the same invalid *loc == true && locOnly == true* 
> *skipUpdateCounter* path could still be reached.
> The proper condition is local-only mode itself, not primary-only mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to