[
https://issues.apache.org/jira/browse/IGNITE-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418107#comment-16418107
]
Ruslan Gilemzyanov edited comment on IGNITE-8035 at 3/28/18 9:07 PM:
---------------------------------------------------------------------
[~NIzhikov] Hello, Nikolay. Thanks for your feedback.
I tried your advices on my local machine. Despite the fact that I couldn't run
concretely your test (I build tests from ignite sources but
GridCommonsAbstractTest's contructor doesn't want to initialize because of
missing tests.properties file) I wrote own test closest to yours (without your
test framework) and I still observe this behavior. In the second assert every
time I get the following (actual value may differs):
{code:java}
java.lang.AssertionError: All events are received by listener
Expected :10000
Actual :10067{code}
{code:java}
package ruslangm.sample.ignite.servers;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.log4j.BasicConfigurator;
import org.junit.Test;
import ruslangm.sample.ignite.listener.EventListener;
import ruslangm.sample.ignite.listener.RemoteFactory;
import javax.cache.Cache;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class Server {
private static final long DATA_AMOUNT = 10_000L;
private static final long TIMEOUT = 30_000L;
@Test
public void test() throws InterruptedException,
IgniteInterruptedCheckedException {
BasicConfigurator.configure();
Ignite grid0 = Ignition.start("server1.xml");
Ignite grid1 = Ignition.start("server2.xml");
CacheConfiguration<String, Long> cfg = new CacheConfiguration<>();
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
cfg.setName("myCache");
cfg.setBackups(2);
IgniteCache<String, Long> cache = grid0.getOrCreateCache(cfg);
ClusterNode node0 = grid0.cluster().localNode();
ClusterNode node1 = grid1.cluster().localNode();
ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
EventListener lsnr0 = new EventListener();
qry0.setLocalListener(lsnr0).setRemoteFilterFactory(new
RemoteFactory(node0));
ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();
EventListener lsnr1 = new EventListener();
qry1.setLocalListener(lsnr1).setRemoteFilterFactory(new
RemoteFactory(node1));
try (QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0);
QueryCursor<Cache.Entry<String, Long>> cursor1 =
cache.query(qry1)) {
for (long i = 0; i < DATA_AMOUNT; i++) {
cache.put("" + i, i);
cache.remove("" + i, i);
}
boolean allEvtsRcvd = GridTestUtils.waitForCondition(new
GridAbsPredicate() {
@Override
public boolean apply() {
return lsnr0.evtsCount() + lsnr1.evtsCount() >= DATA_AMOUNT;
}
}, TIMEOUT);
assertTrue("All events are received by listener", allEvtsRcvd);
assertEquals("All events are received by listener", DATA_AMOUNT,
lsnr0.evtsCount() + lsnr1.evtsCount());
grid0.close();
Thread.sleep(10_000L);
assertEquals("No new events after stop grid", DATA_AMOUNT,
lsnr0.evtsCount() + lsnr1.evtsCount());
}
}
@IgniteAsyncCallback
public class EventListener implements CacheEntryUpdatedListener<String, Long> {
private final AtomicLong evtsCnt = new AtomicLong();
@Override
public void onUpdated(
Iterable<CacheEntryEvent<? extends String, ? extends Long>> events)
throws CacheEntryListenerException {
for (CacheEntryEvent<? extends String, ? extends Long> event : events) {
if (event.getEventType() == EventType.CREATED) {
evtsCnt.incrementAndGet();
}
}
}
public long evtsCount() {
return evtsCnt.get();
}
}
@IgniteAsyncCallback
public class RemoteFactory implements Factory<CacheEntryEventFilter<String,
Long>> {
private final ClusterNode node;
public RemoteFactory(ClusterNode node) {
this.node = node;
}
@Override
public CacheEntryEventFilter<String, Long> create() {
return new CacheEntryEventFilter<String, Long>() {
@IgniteInstanceResource
private Ignite ignite;
@Override
public boolean evaluate(CacheEntryEvent<? extends String, ? extends
Long> cacheEntryEvent) {
return node.id().equals(ignite.cluster().localNode().id());
}
};
}
}
}
{code}
My server1.xml and server2.xml looks so (they differ only in
igniteInstanceName):
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<!--
Explicitly configure TCP discovery SPI to provide list of
initial nodes from the first cluster.
-->
<property name="igniteInstanceName" value="grid1"/>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="48500"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="20"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!--
Addresses and port range of nodes from
the first cluster.
127.0.0.1 can be replaced with actual IP
addresses
or host names. Port range is optional.
-->
<value>127.0.0.1:48500..48520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP communication SPI changing local
port number for the nodes from the first cluster.
-->
<property name="communicationSpi">
<bean
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="48100"/>
</bean>
</property>
</bean>
</beans>
{code}
was (Author: ruslangm):
[~NIzhikov] Hello, Nikolay. Thanks for your feedback.
I tried your advices on my local machine. Despite the fact that I couldn't run
concretely your test (I build tests from ignite sources but
GridCommonsAbstractTest's contructor doesn't want to initialize because of
missing tests.properties file) I wrote own test closest to yours (without your
test framework) and I still observe this behavior. In the second assert every
time I get the following (actual value may differs):
{code:java}
java.lang.AssertionError: All events are received by listener
Expected :10000
Actual :10067{code}
{code:java}
package ruslangm.sample.ignite.servers;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.log4j.BasicConfigurator;
import org.junit.Test;
import ruslangm.sample.ignite.listener.EventListener;
import ruslangm.sample.ignite.listener.RemoteFactory;
import javax.cache.Cache;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class Server {
private static final long DATA_AMOUNT = 10_000L;
private static final long TIMEOUT = 30_000L;
@Test
public void test() throws InterruptedException,
IgniteInterruptedCheckedException {
BasicConfigurator.configure();
Ignite grid0 = Ignition.start("server1.xml");
Ignite grid1 = Ignition.start("server2.xml");
CacheConfiguration<String, Long> cfg = new CacheConfiguration<>();
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
cfg.setName("myCache");
cfg.setBackups(2);
IgniteCache<String, Long> cache = grid0.getOrCreateCache(cfg);
ClusterNode node0 = grid0.cluster().localNode();
ClusterNode node1 = grid1.cluster().localNode();
ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
EventListener lsnr0 = new EventListener();
qry0.setLocalListener(lsnr0).setRemoteFilterFactory(new
RemoteFactory(node0));
ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();
EventListener lsnr1 = new EventListener();
qry1.setLocalListener(lsnr1).setRemoteFilterFactory(new
RemoteFactory(node1));
try (QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0);
QueryCursor<Cache.Entry<String, Long>> cursor1 =
cache.query(qry1)) {
for (long i = 0; i < DATA_AMOUNT; i++) {
cache.put("" + i, i);
cache.remove("" + i, i);
}
boolean allEvtsRcvd = GridTestUtils.waitForCondition(new
GridAbsPredicate() {
@Override
public boolean apply() {
return lsnr0.evtsCount() + lsnr1.evtsCount() >= DATA_AMOUNT;
}
}, TIMEOUT);
assertTrue("All events are received by listener", allEvtsRcvd);
assertEquals("All events are received by listener", DATA_AMOUNT,
lsnr0.evtsCount() + lsnr1.evtsCount());
grid0.close();
Thread.sleep(10_000L);
assertEquals("No new events after stop grid", DATA_AMOUNT,
lsnr0.evtsCount() + lsnr1.evtsCount());
}
}
@IgniteAsyncCallback
public class EventListener implements CacheEntryUpdatedListener<String, Long> {
private final AtomicLong evtsCnt = new AtomicLong();
@Override
public void onUpdated(
Iterable<CacheEntryEvent<? extends String, ? extends Long>> events)
throws CacheEntryListenerException {
for (CacheEntryEvent<? extends String, ? extends Long> event : events) {
if (event.getEventType() == EventType.CREATED) {
evtsCnt.incrementAndGet();
}
}
}
public long evtsCount() {
return evtsCnt.get();
}
}
@IgniteAsyncCallback
public class RemoteFactory implements Factory<CacheEntryEventFilter<String,
Long>> {
private final ClusterNode node;
public RemoteFactory(ClusterNode node) {
this.node = node;
}
@Override
public CacheEntryEventFilter<String, Long> create() {
return new CacheEntryEventFilter<String, Long>() {
@IgniteInstanceResource
private Ignite ignite;
@Override
public boolean evaluate(CacheEntryEvent<? extends String, ? extends
Long> cacheEntryEvent) {
return node.id().equals(ignite.cluster().localNode().id());
}
};
}
}
}
{code}
My server1.xml and server2.xml looks so (they differs only in
igniteInstanceName):
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<!--
Explicitly configure TCP discovery SPI to provide list of
initial nodes from the first cluster.
-->
<property name="igniteInstanceName" value="grid1"/>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="48500"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="20"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!--
Addresses and port range of nodes from
the first cluster.
127.0.0.1 can be replaced with actual IP
addresses
or host names. Port range is optional.
-->
<value>127.0.0.1:48500..48520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP communication SPI changing local
port number for the nodes from the first cluster.
-->
<property name="communicationSpi">
<bean
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="48100"/>
</bean>
</property>
</bean>
</beans>
{code}
> Duplicated events with type CREATED in ContinuousQuery's Events Listener
> -------------------------------------------------------------------------
>
> Key: IGNITE-8035
> URL: https://issues.apache.org/jira/browse/IGNITE-8035
> Project: Ignite
> Issue Type: Bug
> Components: cache
> Affects Versions: 2.4
> Reporter: Ruslan Gilemzyanov
> Assignee: Nikolay Izhikov
> Priority: Major
>
> We faced with bug in ContinuousQuery's EventListener work in Ignite. I wrote
> sample project to demonstrate it.
> We started 2 server nodes connected to the one cache.
> Topology snapshot became [ver=2, servers=2, clients=0, CPUs=4, heap=3.6GB]
> I have put elements in cache (about 50 elements). Elements were distributed
> between two nodes approxiamtely in the same amount.
> After pushing every element to cache we waited 100ms (to ensure that Listener
> did his work) and deleted element from cache.
> Then we stopped one node. (Topology snapshot became [ver=3, servers=1,
> clients=0, CPUs=4, heap=1.8GB])
> And then some absolutely randomly chosen (deleted from cache to this moment)
> events came to other working node with status CREATED (Remind you that we
> deleted them from cache to this moment). In our case it was 5 events.
> I think this is direct violation of Continuous Query's "exactly once
> delivery" contract.
> Source code is here: [https://github.com/ruslangm/ignite-sample]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)