[ 
https://issues.apache.org/jira/browse/IGNITE-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418107#comment-16418107
 ] 

Ruslan Gilemzyanov edited comment on IGNITE-8035 at 3/28/18 9:07 PM:
---------------------------------------------------------------------

[~NIzhikov] Hello, Nikolay. Thanks for your feedback.

I tried your advices on my local machine. Despite the fact that I couldn't run 
concretely your test (I build tests from ignite sources but 
GridCommonsAbstractTest's contructor doesn't want to initialize because of 
missing tests.properties file) I wrote own test closest to yours (without your 
test framework) and I still observe this behavior. In the second assert every 
time I get the following (actual value may differs):
{code:java}
java.lang.AssertionError: All events are received by listener 
Expected :10000
Actual :10067{code}
{code:java}
package ruslangm.sample.ignite.servers;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.log4j.BasicConfigurator;
import org.junit.Test;
import ruslangm.sample.ignite.listener.EventListener;
import ruslangm.sample.ignite.listener.RemoteFactory;

import javax.cache.Cache;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class Server {
    private static final long DATA_AMOUNT = 10_000L;
    private static final long TIMEOUT = 30_000L;

    @Test
    public void test() throws InterruptedException, 
IgniteInterruptedCheckedException {
        BasicConfigurator.configure();
        Ignite grid0 = Ignition.start("server1.xml");
        Ignite grid1 = Ignition.start("server2.xml");

        CacheConfiguration<String, Long> cfg = new CacheConfiguration<>();
        cfg.setCacheMode(CacheMode.PARTITIONED);
        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
          
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
        cfg.setName("myCache");
        cfg.setBackups(2);

        IgniteCache<String, Long> cache = grid0.getOrCreateCache(cfg);

        ClusterNode node0 = grid0.cluster().localNode();
        ClusterNode node1 = grid1.cluster().localNode();

        ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
        EventListener lsnr0 = new EventListener();
        qry0.setLocalListener(lsnr0).setRemoteFilterFactory(new 
RemoteFactory(node0));

        ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();
        EventListener lsnr1 = new EventListener();
        qry1.setLocalListener(lsnr1).setRemoteFilterFactory(new 
RemoteFactory(node1));

        try (QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0);
             QueryCursor<Cache.Entry<String, Long>> cursor1 = 
cache.query(qry1)) {
            for (long i = 0; i < DATA_AMOUNT; i++) {
                cache.put("" + i, i);
                cache.remove("" + i, i);
            }

            boolean allEvtsRcvd = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
                @Override
                public boolean apply() {
                    return lsnr0.evtsCount() + lsnr1.evtsCount() >= DATA_AMOUNT;
                }
            }, TIMEOUT);

            assertTrue("All events are received by listener", allEvtsRcvd);

            assertEquals("All events are received by listener", DATA_AMOUNT, 
lsnr0.evtsCount() + lsnr1.evtsCount());

            grid0.close();

            Thread.sleep(10_000L);

            assertEquals("No new events after stop grid", DATA_AMOUNT, 
lsnr0.evtsCount() + lsnr1.evtsCount());
        }
    }

@IgniteAsyncCallback
public class EventListener implements CacheEntryUpdatedListener<String, Long> {
    private final AtomicLong evtsCnt = new AtomicLong();

    @Override
    public void onUpdated(
            Iterable<CacheEntryEvent<? extends String, ? extends Long>> events) 
throws CacheEntryListenerException {
        for (CacheEntryEvent<? extends String, ? extends Long> event : events) {
            if (event.getEventType() == EventType.CREATED) {
                evtsCnt.incrementAndGet();
            }
        }
    }

    public long evtsCount() {
        return evtsCnt.get();
    }
}

@IgniteAsyncCallback
public class RemoteFactory implements Factory<CacheEntryEventFilter<String, 
Long>> {
    private final ClusterNode node;

    public RemoteFactory(ClusterNode node) {
        this.node = node;
    }

    @Override
    public CacheEntryEventFilter<String, Long> create() {
        return new CacheEntryEventFilter<String, Long>() {
            @IgniteInstanceResource
            private Ignite ignite;

            @Override
            public boolean evaluate(CacheEntryEvent<? extends String, ? extends 
Long> cacheEntryEvent) {
                return node.id().equals(ignite.cluster().localNode().id());
            }
        };
    }
}
}
{code}
My server1.xml and server2.xml looks so (they differ only in 
igniteInstanceName):
{code:java}
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xmlns:util="http://www.springframework.org/schema/util";
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd";>
    <bean class="org.apache.ignite.configuration.IgniteConfiguration">
        <!--
        Explicitly configure TCP discovery SPI to provide list of
        initial nodes from the first cluster.
        -->
        <property name="igniteInstanceName" value="grid1"/>
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <!-- Initial local port to listen to. -->
                <property name="localPort" value="48500"/>
                <!-- Changing local port range. This is an optional action. -->
                <property name="localPortRange" value="20"/>

                <!-- Setting up IP finder for this cluster -->
                <property name="ipFinder">
                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!--
                                Addresses and port range of nodes from
                                the first cluster.
                                127.0.0.1 can be replaced with actual IP 
addresses
                                or host names. Port range is optional.
                                -->
                                <value>127.0.0.1:48500..48520</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>

        <!--
        Explicitly configure TCP communication SPI changing local
        port number for the nodes from the first cluster.
        -->
        <property name="communicationSpi">
            <bean 
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <property name="localPort" value="48100"/>
            </bean>
        </property>
    </bean>
</beans>
{code}
 


was (Author: ruslangm):
[~NIzhikov] Hello, Nikolay. Thanks for your feedback.

I tried your advices on my local machine. Despite the fact that I couldn't run 
concretely your test (I build tests from ignite sources but 
GridCommonsAbstractTest's contructor doesn't want to initialize because of 
missing tests.properties file) I wrote own test closest to yours (without your 
test framework) and I still observe this behavior. In the second assert every 
time I get the following (actual value may differs):
{code:java}
java.lang.AssertionError: All events are received by listener 
Expected :10000
Actual :10067{code}
 
{code:java}
package ruslangm.sample.ignite.servers;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.log4j.BasicConfigurator;
import org.junit.Test;
import ruslangm.sample.ignite.listener.EventListener;
import ruslangm.sample.ignite.listener.RemoteFactory;

import javax.cache.Cache;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class Server {
    private static final long DATA_AMOUNT = 10_000L;
    private static final long TIMEOUT = 30_000L;

    @Test
    public void test() throws InterruptedException, 
IgniteInterruptedCheckedException {
        BasicConfigurator.configure();
        Ignite grid0 = Ignition.start("server1.xml");
        Ignite grid1 = Ignition.start("server2.xml");

        CacheConfiguration<String, Long> cfg = new CacheConfiguration<>();
        cfg.setCacheMode(CacheMode.PARTITIONED);
        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
          
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
        cfg.setName("myCache");
        cfg.setBackups(2);

        IgniteCache<String, Long> cache = grid0.getOrCreateCache(cfg);

        ClusterNode node0 = grid0.cluster().localNode();
        ClusterNode node1 = grid1.cluster().localNode();

        ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
        EventListener lsnr0 = new EventListener();
        qry0.setLocalListener(lsnr0).setRemoteFilterFactory(new 
RemoteFactory(node0));

        ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();
        EventListener lsnr1 = new EventListener();
        qry1.setLocalListener(lsnr1).setRemoteFilterFactory(new 
RemoteFactory(node1));

        try (QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0);
             QueryCursor<Cache.Entry<String, Long>> cursor1 = 
cache.query(qry1)) {
            for (long i = 0; i < DATA_AMOUNT; i++) {
                cache.put("" + i, i);
                cache.remove("" + i, i);
            }

            boolean allEvtsRcvd = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
                @Override
                public boolean apply() {
                    return lsnr0.evtsCount() + lsnr1.evtsCount() >= DATA_AMOUNT;
                }
            }, TIMEOUT);

            assertTrue("All events are received by listener", allEvtsRcvd);

            assertEquals("All events are received by listener", DATA_AMOUNT, 
lsnr0.evtsCount() + lsnr1.evtsCount());

            grid0.close();

            Thread.sleep(10_000L);

            assertEquals("No new events after stop grid", DATA_AMOUNT, 
lsnr0.evtsCount() + lsnr1.evtsCount());
        }
    }

@IgniteAsyncCallback
public class EventListener implements CacheEntryUpdatedListener<String, Long> {
    private final AtomicLong evtsCnt = new AtomicLong();

    @Override
    public void onUpdated(
            Iterable<CacheEntryEvent<? extends String, ? extends Long>> events) 
throws CacheEntryListenerException {
        for (CacheEntryEvent<? extends String, ? extends Long> event : events) {
            if (event.getEventType() == EventType.CREATED) {
                evtsCnt.incrementAndGet();
            }
        }
    }

    public long evtsCount() {
        return evtsCnt.get();
    }
}

@IgniteAsyncCallback
public class RemoteFactory implements Factory<CacheEntryEventFilter<String, 
Long>> {
    private final ClusterNode node;

    public RemoteFactory(ClusterNode node) {
        this.node = node;
    }

    @Override
    public CacheEntryEventFilter<String, Long> create() {
        return new CacheEntryEventFilter<String, Long>() {
            @IgniteInstanceResource
            private Ignite ignite;

            @Override
            public boolean evaluate(CacheEntryEvent<? extends String, ? extends 
Long> cacheEntryEvent) {
                return node.id().equals(ignite.cluster().localNode().id());
            }
        };
    }
}
}
{code}
My server1.xml and server2.xml looks so (they differs only in 
igniteInstanceName):

 

 
{code:java}
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xmlns:util="http://www.springframework.org/schema/util";
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd";>
    <bean class="org.apache.ignite.configuration.IgniteConfiguration">
        <!--
        Explicitly configure TCP discovery SPI to provide list of
        initial nodes from the first cluster.
        -->
        <property name="igniteInstanceName" value="grid1"/>
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <!-- Initial local port to listen to. -->
                <property name="localPort" value="48500"/>
                <!-- Changing local port range. This is an optional action. -->
                <property name="localPortRange" value="20"/>

                <!-- Setting up IP finder for this cluster -->
                <property name="ipFinder">
                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!--
                                Addresses and port range of nodes from
                                the first cluster.
                                127.0.0.1 can be replaced with actual IP 
addresses
                                or host names. Port range is optional.
                                -->
                                <value>127.0.0.1:48500..48520</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>

        <!--
        Explicitly configure TCP communication SPI changing local
        port number for the nodes from the first cluster.
        -->
        <property name="communicationSpi">
            <bean 
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <property name="localPort" value="48100"/>
            </bean>
        </property>
    </bean>
</beans>
{code}
 

> Duplicated events with type CREATED in ContinuousQuery's Events Listener 
> -------------------------------------------------------------------------
>
>                 Key: IGNITE-8035
>                 URL: https://issues.apache.org/jira/browse/IGNITE-8035
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache
>    Affects Versions: 2.4
>            Reporter: Ruslan Gilemzyanov
>            Assignee: Nikolay Izhikov
>            Priority: Major
>
> We faced with bug in ContinuousQuery's EventListener work in Ignite. I wrote 
> sample project to demonstrate it.
> We started 2 server nodes connected to the one cache.
> Topology snapshot became [ver=2, servers=2, clients=0, CPUs=4, heap=3.6GB]
> I have put elements in cache (about 50 elements). Elements were distributed 
> between two nodes approxiamtely in the same amount.
> After pushing every element to cache we waited 100ms (to ensure that Listener 
> did his work) and deleted element from cache. 
> Then we stopped one node. (Topology snapshot became [ver=3, servers=1, 
> clients=0, CPUs=4, heap=1.8GB])
> And then some absolutely randomly chosen (deleted from cache to this moment) 
> events came to other working node with status CREATED (Remind you that we 
> deleted them from cache to this moment). In our case it was 5 events.
> I think this is direct violation of Continuous Query's "exactly once 
> delivery" contract. 
> Source code is here: [https://github.com/ruslangm/ignite-sample]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to