Dmitry Uskov created IGNITE-7868:
------------------------------------

             Summary: Continuous Query LocalListener on backup node works 
unstable
                 Key: IGNITE-7868
                 URL: https://issues.apache.org/jira/browse/IGNITE-7868
             Project: Ignite
          Issue Type: Bug
    Affects Versions: 2.3
            Reporter: Dmitry Uskov


 I have two nodes (*node1* and *node2*) with configuration:
{code:java}
Ignition.start();

ProgramContext.ignite = Ignition.ignite(); //ProgramContext.ignite is public 
static field

String cacheName = "A-Cache";

CacheConfiguration<CacheKey, CacheValue> cacheConfiguration = new 
CacheConfiguration<>();
cacheConfiguration.setName(cacheName);
cacheConfiguration.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cacheConfiguration.setBackups(1);

IgniteCache<CacheKey, CacheValue> cache = 
ProgramContext.ignite.getOrCreateCache(cacheConfiguration);

ContinuousQuery<CacheKey, CacheValue> query = new ContinuousQuery<>();

ClusterNode node = ProgramContext.ignite.cluster().localNode();

query.setRemoteFilterFactory((Factory<CacheEntryEventFilter<CacheKey, 
CacheValue>>) () -> (CacheEntryEventFilter<CacheKey, CacheValue>) event -> {
 boolean currentNodeIsPrimary = 
node.equals(ProgramContext.ignite.cluster().localNode());
 boolean eventTypeIsCreated = 
event.getEventType().equals(javax.cache.event.EventType.CREATED);
 return currentNodeIsPrimary && eventTypeIsCreated;
});
query.setLocalListener(cacheEntryEvents -> {
 for (CacheEntryEvent<? extends CacheKey, ? extends CacheValue> cacheEntryEvent 
: cacheEntryEvents) {
 System.out.println("CacheEntryUpdatedListener: " + cacheEntryEvent.getKey() + 
"/" + cacheEntryEvent.getValue());
 }
});
cache.query(query);
{code}
CacheKey:
{code:java}
public class CacheKey {

 public static final String DEFAULT_AFFINITY_KEY = "cons";

 private static String affinityKeyValue = DEFAULT_AFFINITY_KEY;

 private String key;

 @AffinityKeyMapped
 private String affinityKey;

 public CacheKey(String key) {
 this.key = key;
 this.affinityKey = affinityKeyValue; //always DEFAULT_AFFINITY_KEY 
 }

 //... getters/setters and toString()

}{code}
CacheValue:
{code:java}
public class CacheValue {

 private String value;

 public CacheValue(String value) {
 this.value = value;
 }
 //... getters/setters and toString()
 
}{code}
I put two values in cache. As affinityKey always equals DAFAULT_AFFINITY_KEY 
one node (suppose *node1*)  is primary for both values. LocalListener has 
worked twice on *node1*.

Console output:
{noformat}
CacheEntryUpdatedListener: CacheKey{key='node1-0', 
affinityKey='cons'}/CacheValue{value='node1-0'}
CacheEntryUpdatedListener: CacheKey{key='node1-1', 
affinityKey='cons'}/CacheValue{value='node1-1'}
{noformat}
 

Then I stoped it and I saw LocalListener on *node2* has worked once, but I 
expected LocalListener on *node2* had to work twice.

Console out node1:
{noformat}
[12:36:36] Ignite node stopped OK [uptime=00:00:14.243]{noformat}
Console out node2:
{noformat}
[12:36:36] Topology snapshot [ver=3, servers=1, clients=0, CPUs=4, heap=1.6GB]
CacheEntryUpdatedListener: CacheKey{key='node1-1', 
affinityKey='cons'}/CacheValue{value='node1-1'}{noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to