Hello, Yes, scan queries do not honor transaction guarantees.
Thanks, S. ср, 27 дек. 2023 г. в 14:43, 38797715 <38797...@qq.com>: > Hi, > > The demo is as follows and attached, It seems that ScanQuery has read the > intermediate data of the transaction. > > Execute the following code: > @Slf4j > public class QueryTest { > public static void main(String[] args) { > Ignite ignite = Ignition.start(); > testRange(ignite); > } > public static void testRange(Ignite ignite) { > String cacheName = "positionCache"; > CacheConfiguration<String, Position> cfg = new CacheConfiguration<>( > cacheName); > cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); > final IgniteCache<String, Position> cache = ignite.createCache(cfg); > CountDownLatch writeCountDownLatch = new CountDownLatch(1); > CountDownLatch readCountDownLatch = new CountDownLatch(1); > boolean writeThreadStopFlag = false; > boolean readThreadStopFlag = false; > final Long combiId = 123l; > Task writeTask = new Task(ignite, writeCountDownLatch) { > @Override > protected void invoke() { > int count = 1; > while (!writeThreadStopFlag && count <= 100) { > try (Transaction transaction = ignite.transactions().txStart( > TransactionConcurrency.PESSIMISTIC, > TransactionIsolation.REPEATABLE_READ)) { > for (int i = 0; i < 1000; i++) { > Position position = new Position(combiId, 10000 + i, "000001", BigDecimal. > valueOf(1000+count)); > cache.put(position.getKey(), position); > } > transaction.commit(); > log.info("{}write success",count); > } catch (Exception e) { > log.error("write error", e); > } > count++; > } > countDownLatch.countDown(); > } > }; > Task.start(writeTask); > ScanQuery<String, Position> scanQuery = new ScanQuery<>((k, v) -> combiId. > equals(v.getCombiId())); > Task readTask = new Task(ignite, readCountDownLatch) { > @Override > protected void invoke() { > int count = 1; > int tryTimes = 300; > int unmatchCount = 0; > while (!readThreadStopFlag && count <= tryTimes) { > List<Position> positionDetails = new ArrayList<>(); > IgniteCache<String, Position> cache = ignite.cache(cacheName); > try (QueryCursor<Cache.Entry<String, Position>> cursor = cache.query( > scanQuery)) { > List<Cache.Entry<String, Position>> list = cursor.getAll(); > list.forEach(x -> positionDetails.add(x.getValue())); > } > if (positionDetails.size() > 0) { > BigDecimal currentQuantity = positionDetails.get(0).getQuantity(); > List<Position> unmatchDetails = positionDetails.stream() > .filter(x -> !currentQuantity.equals(x.getQuantity())).collect(Collectors. > toList()); > if (unmatchDetails.isEmpty()) { > log.info("{}query success", count); > } else { > log.error("{} query,validate error,Quantity[{},{}] inconsistent, > inconsistent size:{}", count, currentQuantity, > unmatchDetails.stream().map(x -> x.getQuantity()).distinct().collect( > Collectors.toList()), > unmatchDetails.size()); > unmatchCount++; > } > } > try { > if (writeCountDownLatch.await(50, TimeUnit.MILLISECONDS)) { > break; > } > } catch (InterruptedException e) { > log.error("query error", e); > } > count++; > } > countDownLatch.countDown(); > } > }; > Task.start(readTask); > } > } > > >