[ 
https://issues.apache.org/jira/browse/IGNITE-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417629#comment-16417629
 ] 

Nikolay Izhikov edited comment on IGNITE-8035 at 3/28/18 4:02 PM:
------------------------------------------------------------------

Hello, [~ruslangm]

I wrote test to try to reproduce for your issue.
But I can't reproduce your issue.

Seems like you misused Continuous Query API.
If you want to get Ignite instance for a remote filter you should use 
@IgniteResouceInstance annotation.
Please, take a look to RemoteFactory in test.

{code:java}
package org.apache.ignite.internal.processors.cache.query.continuous;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;

public class GridCacheContinuousQueryDuplicateEventsTest extends 
GridCommonAbstractTest implements Serializable {
    /** IP finder. */
    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);

    /** */
    private static final long TIMEOUT = 60_000L;

    public static final long DATA_AMOUNT = 10_000L;

    /** {@inheritDoc} */
    @SuppressWarnings("unchecked")
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration cacheCfg = defaultCacheConfiguration();

        cacheCfg.setCacheMode(PARTITIONED);
        cacheCfg.setAtomicityMode(ATOMIC);
        cacheCfg.setWriteSynchronizationMode(FULL_ASYNC);
        cacheCfg.setBackups(2);

        cfg.setCacheConfiguration(cacheCfg);

        TcpDiscoverySpi disco = new TcpDiscoverySpi();

        disco.setIpFinder(IP_FINDER);

        cfg.setDiscoverySpi(disco);

        
//((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        startGridsMultiThreaded(2);
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        stopAllGrids();
    }

    /**
     * @throws Exception If failed.
     */
    public void testDuplicateEvents() throws Exception {
        Ignite grid0 = grid(0);
        
        ClusterNode node0 = grid0.cluster().localNode();
        
        Ignite grid1 = grid(1);
        
        ClusterNode node1 = grid1.cluster().localNode();

        IgniteCache<String, Long> cache = 
grid0.getOrCreateCache(DEFAULT_CACHE_NAME);

        ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
        
        EventListener lsnr0 = new EventListener();

        qry0.setLocalListener(lsnr0)
            .setRemoteFilterFactory(new RemoteFactory(node0));

        ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();

        EventListener lsnr1 = new EventListener();

        qry1.setLocalListener(lsnr1)
            .setRemoteFilterFactory(new RemoteFactory(node1));

        try(QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0); 
            QueryCursor<Cache.Entry<String, Long>> cursor1 = cache.query(qry1)) 
{
            
            for (long i=0; i<DATA_AMOUNT; i++) {
                cache.put("" + i, i);

                cache.remove("" + i, i);
            }

            boolean allEvtsRcvd = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
                @Override public boolean apply() {
                    return lsnr0.evtsCnt() + lsnr1.evtsCnt() >= DATA_AMOUNT;
                }
            }, TIMEOUT);

            assertTrue( "All events are received by listener", allEvtsRcvd);
            
            assertEquals("All events are received by listener", DATA_AMOUNT, 
lsnr0.evtsCnt() + lsnr1.evtsCnt());
            
            stopGrid(0);
            
            Thread.sleep(10_000L);
            
            assertEquals("No new events after stop grid", DATA_AMOUNT, 
lsnr0.evtsCnt() + lsnr1.evtsCnt());
        }
    }

    @IgniteAsyncCallback
    public static class EventListener implements 
CacheEntryUpdatedListener<String, Long> {
        private AtomicLong evtsCnt = new AtomicLong();
        
        @Override
        public void onUpdated(
            Iterable<CacheEntryEvent<? extends String, ? extends Long>> events) 
throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends String, ? extends Long> event : 
events) {
                if (event.getEventType() == EventType.CREATED) {
                    evtsCnt.incrementAndGet();
                }
            }
        }

        public long evtsCnt() {
            return evtsCnt.get();
        }
    }

    @IgniteAsyncCallback
    public class RemoteFactory implements Factory<CacheEntryEventFilter<String, 
Long>> {
        private final ClusterNode node;

        public RemoteFactory(ClusterNode node) {
            this.node = node;
        }

        @Override
        public CacheEntryEventFilter<String, Long> create() {
            return new CacheEntryEventFilter<String, Long>() {
                @IgniteInstanceResource private Ignite ignite;

                @Override public boolean evaluate(CacheEntryEvent event) throws 
CacheEntryListenerException {
                    return node.id().equals(ignite.cluster().localNode().id());
                }
            };
        }
    }
}
{code}


was (Author: nizhikov):
Hello, [~ruslangm]

I wrote reproducer for your issue.

Seems like you misused Continuous Query API.
If you want to get Ignite instance for a remote filter you should use 
@IgniteResouceInstance annotation.
Please, take a look to RemoteFactory in test.

{code:java}
package org.apache.ignite.internal.processors.cache.query.continuous;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;

public class GridCacheContinuousQueryDuplicateEventsTest extends 
GridCommonAbstractTest implements Serializable {
    /** IP finder. */
    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);

    /** */
    private static final long TIMEOUT = 60_000L;

    public static final long DATA_AMOUNT = 10_000L;

    /** {@inheritDoc} */
    @SuppressWarnings("unchecked")
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration cacheCfg = defaultCacheConfiguration();

        cacheCfg.setCacheMode(PARTITIONED);
        cacheCfg.setAtomicityMode(ATOMIC);
        cacheCfg.setWriteSynchronizationMode(FULL_ASYNC);
        cacheCfg.setBackups(2);

        cfg.setCacheConfiguration(cacheCfg);

        TcpDiscoverySpi disco = new TcpDiscoverySpi();

        disco.setIpFinder(IP_FINDER);

        cfg.setDiscoverySpi(disco);

        
//((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        startGridsMultiThreaded(2);
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        stopAllGrids();
    }

    /**
     * @throws Exception If failed.
     */
    public void testDuplicateEvents() throws Exception {
        Ignite grid0 = grid(0);
        
        ClusterNode node0 = grid0.cluster().localNode();
        
        Ignite grid1 = grid(1);
        
        ClusterNode node1 = grid1.cluster().localNode();

        IgniteCache<String, Long> cache = 
grid0.getOrCreateCache(DEFAULT_CACHE_NAME);

        ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
        
        EventListener lsnr0 = new EventListener();

        qry0.setLocalListener(lsnr0)
            .setRemoteFilterFactory(new RemoteFactory(node0));

        ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();

        EventListener lsnr1 = new EventListener();

        qry1.setLocalListener(lsnr1)
            .setRemoteFilterFactory(new RemoteFactory(node1));

        try(QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0); 
            QueryCursor<Cache.Entry<String, Long>> cursor1 = cache.query(qry1)) 
{
            
            for (long i=0; i<DATA_AMOUNT; i++) {
                cache.put("" + i, i);

                cache.remove("" + i, i);
            }

            boolean allEvtsRcvd = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
                @Override public boolean apply() {
                    return lsnr0.evtsCnt() + lsnr1.evtsCnt() >= DATA_AMOUNT;
                }
            }, TIMEOUT);

            assertTrue( "All events are received by listener", allEvtsRcvd);
            
            assertEquals("All events are received by listener", DATA_AMOUNT, 
lsnr0.evtsCnt() + lsnr1.evtsCnt());
            
            stopGrid(0);
            
            Thread.sleep(10_000L);
            
            assertEquals("No new events after stop grid", DATA_AMOUNT, 
lsnr0.evtsCnt() + lsnr1.evtsCnt());
        }
    }

    @IgniteAsyncCallback
    public static class EventListener implements 
CacheEntryUpdatedListener<String, Long> {
        private AtomicLong evtsCnt = new AtomicLong();
        
        @Override
        public void onUpdated(
            Iterable<CacheEntryEvent<? extends String, ? extends Long>> events) 
throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends String, ? extends Long> event : 
events) {
                if (event.getEventType() == EventType.CREATED) {
                    evtsCnt.incrementAndGet();
                }
            }
        }

        public long evtsCnt() {
            return evtsCnt.get();
        }
    }

    @IgniteAsyncCallback
    public class RemoteFactory implements Factory<CacheEntryEventFilter<String, 
Long>> {
        private final ClusterNode node;

        public RemoteFactory(ClusterNode node) {
            this.node = node;
        }

        @Override
        public CacheEntryEventFilter<String, Long> create() {
            return new CacheEntryEventFilter<String, Long>() {
                @IgniteInstanceResource private Ignite ignite;

                @Override public boolean evaluate(CacheEntryEvent event) throws 
CacheEntryListenerException {
                    return node.id().equals(ignite.cluster().localNode().id());
                }
            };
        }
    }
}
{code}

> Duplicated events with type CREATED in ContinuousQuery's Events Listener 
> -------------------------------------------------------------------------
>
>                 Key: IGNITE-8035
>                 URL: https://issues.apache.org/jira/browse/IGNITE-8035
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache
>    Affects Versions: 2.4
>            Reporter: Ruslan Gilemzyanov
>            Assignee: Nikolay Izhikov
>            Priority: Major
>
> We faced with bug in ContinuousQuery's EventListener work in Ignite. I wrote 
> sample project to demonstrate it.
> We started 2 server nodes connected to the one cache.
> Topology snapshot became [ver=2, servers=2, clients=0, CPUs=4, heap=3.6GB]
> I have put elements in cache (about 50 elements). Elements were distributed 
> between two nodes approxiamtely in the same amount.
> After pushing every element to cache we waited 100ms (to ensure that Listener 
> did his work) and deleted element from cache. 
> Then we stopped one node. (Topology snapshot became [ver=3, servers=1, 
> clients=0, CPUs=4, heap=1.8GB])
> And then some absolutely randomly chosen (deleted from cache to this moment) 
> events came to other working node with status CREATED (Remind you that we 
> deleted them from cache to this moment). In our case it was 5 events.
> I think this is direct violation of Continuous Query's "exactly once 
> delivery" contract. 
> Source code is here: [https://github.com/ruslangm/ignite-sample]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to