
Vladimir Steshin edited comment on IGNITE-17369 at 10/10/22 9:09 AM:

Snapshot can begin work with different state of kin partitions. The shapshot 
process waits for the datastreamer futures. 
(_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these 
futures are created separately and concurrently on primary and backups nodes by 
_IsolatedUpdater_. As result, at the checkpoint some backups might be written 
without the primaries. And opposite. There are no updates accepted during 
checkpoint. Late streamer updates is not written to snapshoting partitions.


1) V1 (PR 10285). 
PR brings watching _DataStreamer_ futures in snapshot process. The futures are 
created before writing streamer batch on any node. We cannot relay on the 
future as on final and consistent write for streamer batch or certain entry. 
But we know that datastreamer is in progress at the checkpoint and that it is 
on pause. We can invalidate snapshot at this moment.
In theory the solution is not resilent. On streamer batch could've been 
entirely written before snapshot. Second batch after. First batch writes 
partition on primaries or backups. Second writes the rest. Snapshot is 

2) V2 (PR 10286).
_IsolatedUpdater_ could just notify snapshot process, if exists, that 
concurrent inconsistent update is on. A notification of at least one entry on 
any node wound be enough. Should work in practice. In theory the solution is 
not resilent. On streamer batch could've been entirely written before snapshot. 
Second batch after. First batch writes partition on primaries or backups. 
Second writes the rest. Snapshot is inconsistent.

3) V3 (PR 10284).
We could mark that _DataStreamer_ is on on any first streamer batch received. 
And unmark somehow later. If _DataStreamer_ is marked as active, the snapshot 
process could check this mark. Since the mark is set before writting data, it 
is set before the datastreamer future which is being waited for in the snapshot 
process. This guaraties the mark are visible before the snapshot.

The problem is how to close such mark. When the streaming node left? Node can 
live forever. Send special closing request? The streamer node can do not close 
streamer at all. Meaning no _close()_ is invoked. Moreoever, _DataStreamer_ 
works through _CommunicationSPI_. Which doesn't guarantee delivery. We can't be 
sure that closing request is delivered and streamer is unmarked on the 
accepting node. Do we need to set this mark with a timeout and re-set with next 
datastreamer batche? Which timeout? Bind to what? 
On closing requests, a rebalance can happen. Should be processed too. Looks 
like we need a discovery closing message. Much simpler and reliable. 
Also, datastreamer can be canceled. Meaning one batches were written before 
snapshot. Other won't ever. 

4) V4 (PR 10299).
We could watch streamer is already registered before snapshot and 
simultaneously. The snapshot init message is accepted on client. We may adjust 
snapshot proces so that it would watch datastreamer even on client node. 

was (Author: vladsz83):
Snapshot can begin work with different state of kin partitions. The shapshot 
process waits for the datastreamer futures. 
(_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these 
futures are created separately and concurrently on primary and backups nodes by 
_IsolatedUpdater_. As result, at the checkpoint some backups might be written 
without the primaries. And opposite. There are no updates accepted during 
checkpoint. Late streamer updates is not written to snapshoting partitions.


1) V1 (PR 10285). 
PR brings watching _DataStreamer_ futures in snapshot process. The futures are 
created before writing streamer batch on any node. We cannot relay on the 
future as on final and consistent write for streamer batch or certain entry. 
But we know that datastreamer is in progress at the checkpoint and that it is 
on pause. We can invalidate snapshot at this moment.
In theory the solution is not resilent. On streamer batch could've been 
entirely written before snapshot. Second batch after. First batch writes 
partition on primaries or backups. Second writes the rest. Snapshot is 

2) V2 (PR 10286).
_IsolatedUpdater_ could just notify snapshot process, if exists, that 
concurrent inconsistent update is on. A notification of at least one entry on 
any node wound be enough. Should work in practice. In theory the solution is 
not resilent. On streamer batch could've been entirely written before snapshot. 
Second batch after. First batch writes partition on primaries or backups. 
Second writes the rest. Snapshot is inconsistent.

3) V3 (PR 10284).
We could mark that _DataStreamer_ is on on any first streamer batch received. 
And unmark somehow later. If _DataStreamer_ is marked as active, the snapshot 
process could check this mark. Since the mark is set before writting data, it 
is set before the datastreamer future which is being waited for in the snapshot 
process. This guaraties the mark are visible before the snapshot.

The problem is how to close such mark. When the streaming node left? Node can 
live forever. Send special closing request? The streamer node can do not close 
streamer at all. Meaning no _close()_ is invoked. Moreoever, _DataStreamer_ 
works through _CommunicationSPI_. Which doesn't guarantee delivery. We can't be 
sure that closing request is delivered and streamer is unmarked on the 
accepting node. Do we need to set this mark with a timeout and re-set with next 
datastreamer batche? Which timeout? Bind to what? 
On closing requests, a rebalance can happen. Should be processed too. Looks 
like we need a discovery closing message. Much simpler and reliable. 
Also, datastreamer can be canceled. Meaning one batches were written before 
snapshot. Other won't ever. 

This solution looks hazardous.

> Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
> -----------------------------------------------------------------------------
>                 Key: IGNITE-17369
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17369
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Vladimir Steshin
>            Assignee: Vladimir Steshin
>            Priority: Major
>              Labels: ise, ise.lts
>         Attachments: IgniteClusterShanpshotStreamerTest.java
> Ignite fails to restore snapshot created under streamed load:
> {code:java}
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, 
> partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, 
> partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
> {code}
> Test (attached):
> {code:java}
>     public void testClusterSnapshotConsistencyWithStreamer() throws Exception 
> {
>         int grids = 2;
>         CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
>         AtomicBoolean stopLoading = new AtomicBoolean(false);
>         dfltCacheCfg = null;
>         Class.forName("org.apache.ignite.IgniteJdbcDriver");
>         String tableName = "TEST_TBL1";
>         startGrids(grids);
>         grid(0).cluster().state(ACTIVE);
>         IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, 
> stopLoading, loadNumberBeforeSnapshot);
>         loadNumberBeforeSnapshot.await();
>         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
>         stopLoading.set(true);
>         load1.get();
>         grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
>         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, 
> F.asList("SQL_PUBLIC_TEST_TBL1")).get();
>     }
>     /** */
>     private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, 
> int backups, boolean streaming, AtomicBoolean stop,
>         CountDownLatch startSnp) {
>         return GridTestUtils.runMultiThreadedAsync(() -> {
>             if(useCache) {
>                 String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
>                 IgniteCache<Integer, Object> cache = grid(0)
>                     .createCache(new CacheConfiguration<Integer, 
> Object>(cacheName).setBackups(backups)
>                         .setCacheMode(CacheMode.REPLICATED));
>                 try (IgniteDataStreamer<Integer, Object> ds = 
> grid(0).dataStreamer(cacheName)) {
>                     for (int i = 0; !stop.get(); ++i) {
>                         if (streaming)
>                             ds.addData(i, new Account(i, i - 1));
>                         else
>                             cache.put(i, new Account(i, i - 1));
>                         if (startSnp.getCount() > 0)
>                             startSnp.countDown();
>                         Thread.yield();
>                     }
>                 }
>             } else {
>                 try (Connection conn = 
> DriverManager.getConnection("jdbc:ignite:thin://")) {
>                     createTable(conn, tblName, backups);
>                     try (PreparedStatement stmt = 
> conn.prepareStatement("INSERT INTO " + tblName +
>                         "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
>                         if (streaming)
>                             conn.prepareStatement("SET STREAMING 
> ON;").execute();
>                         int leftLimit = 97; // letter 'a'
>                         int rightLimit = 122; // letter'z'
>                         int targetStringLength = 15;
>                         Random rand = new Random();
> //
>                         for (int i = 0; !stop.get(); ++i) {
>                             int orgid = rand.ints(1, 0, 
> 5).findFirst().getAsInt();
>                             String val = rand.ints(leftLimit, rightLimit + 
> 1).limit(targetStringLength)
>                                 .collect(StringBuilder::new, 
> StringBuilder::appendCodePoint, StringBuilder::append)
>                                 .toString();
>                             stmt.setInt(1, i);
>                             stmt.setString(2, val);
>                             stmt.setInt(3, orgid);
>                             stmt.setInt(4, 0);
>                             stmt.executeUpdate();
>                             if (startSnp.getCount() > 0)
>                                 startSnp.countDown();
>                             Thread.yield();
>                         }
>                     }
>                 }
>                 catch (Exception e) {
>                     while (startSnp.getCount() > 0)
>                         startSnp.countDown();
>                     throw new IgniteException("Unable to load.", e);
>                 }
>             }
>         }, 1, "load-thread-" + tblName);
>     }
> {code}

This message was sent by Atlassian Jira

Reply via email to