Vladimir Steshin created IGNITE-17369:
-----------------------------------------

             Summary: Snapshot is inconsistent under streamed loading.
                 Key: IGNITE-17369
                 URL: https://issues.apache.org/jira/browse/IGNITE-17369
             Project: Ignite
          Issue Type: Improvement
            Reporter: Vladimir Steshin
         Attachments: IgniteClusterShanpshotStreamerTest.java

Ignite fails to restore snapshot created under streamed load:


{code:java}
Conflict partition: PartitionKeyV2 [grpId=109386747, 
grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, 
partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 
[isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
Conflict partition: PartitionKeyV2 [grpId=109386747, 
grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, 
partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 
[isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
{code}

Test (attached):

{code:java}
    public void testClusterSnapshotConsistencyWithStreamer() throws Exception {
        int grids = 2;
        CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
        AtomicBoolean stopLoading = new AtomicBoolean(false);
        dfltCacheCfg = null;
        Class.forName("org.apache.ignite.IgniteJdbcDriver");
        String tableName = "TEST_TBL1";

        startGrids(grids);
        grid(0).cluster().state(ACTIVE);

        IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, 
stopLoading, loadNumberBeforeSnapshot);

        loadNumberBeforeSnapshot.await();

        grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();

        stopLoading.set(true);

        load1.get();

        grid(0).cache("SQL_PUBLIC_" + tableName).destroy();

        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, 
F.asList("SQL_PUBLIC_TEST_TBL1")).get();
    }

    /** */
    private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, 
int backups, boolean streaming, AtomicBoolean stop,
        CountDownLatch startSnp) {
        return GridTestUtils.runMultiThreadedAsync(() -> {
            if(useCache) {
                String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();

                IgniteCache<Integer, Object> cache = grid(0)
                    .createCache(new CacheConfiguration<Integer, 
Object>(cacheName).setBackups(backups)
                        .setCacheMode(CacheMode.REPLICATED));

                try (IgniteDataStreamer<Integer, Object> ds = 
grid(0).dataStreamer(cacheName)) {
                    for (int i = 0; !stop.get(); ++i) {
                        if (streaming)
                            ds.addData(i, new Account(i, i - 1));
                        else
                            cache.put(i, new Account(i, i - 1));

                        if (startSnp.getCount() > 0)
                            startSnp.countDown();

                        Thread.yield();
                    }
                }
            } else {
                try (Connection conn = 
DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
                    createTable(conn, tblName, backups);

                    try (PreparedStatement stmt = conn.prepareStatement("INSERT 
INTO " + tblName +
                        "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {

                        if (streaming)
                            conn.prepareStatement("SET STREAMING 
ON;").execute();

                        int leftLimit = 97; // letter 'a'
                        int rightLimit = 122; // letter'z'
                        int targetStringLength = 15;
                        Random rand = new Random();
//
                        for (int i = 0; !stop.get(); ++i) {
                            int orgid = rand.ints(1, 0, 
5).findFirst().getAsInt();

                            String val = rand.ints(leftLimit, rightLimit + 
1).limit(targetStringLength)
                                .collect(StringBuilder::new, 
StringBuilder::appendCodePoint, StringBuilder::append)
                                .toString();
                            stmt.setInt(1, i);
                            stmt.setString(2, val);
                            stmt.setInt(3, orgid);
                            stmt.setInt(4, 0);

                            stmt.executeUpdate();

                            if (startSnp.getCount() > 0)
                                startSnp.countDown();

                            Thread.yield();
                        }
                    }
                }
                catch (Exception e) {
                    while (startSnp.getCount() > 0)
                        startSnp.countDown();

                    throw new IgniteException("Unable to load.", e);
                }
            }
        }, 1, "load-thread-" + tblName);
    }
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to