Hello,

Yes, scan queries do not honor transaction guarantees.

Thanks,
S.

ср, 27 дек. 2023 г. в 14:43, 38797715 <38797...@qq.com>:

> Hi,
>
> The demo is as follows and attached, It seems that ScanQuery has read the
> intermediate data of the transaction.
>
> Execute the following code:
> @Slf4j
> public class QueryTest {
> public static void main(String[] args) {
> Ignite ignite = Ignition.start();
> testRange(ignite);
> }
> public static void testRange(Ignite ignite) {
> String cacheName = "positionCache";
> CacheConfiguration<String, Position> cfg = new CacheConfiguration<>(
> cacheName);
> cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
> final IgniteCache<String, Position> cache = ignite.createCache(cfg);
> CountDownLatch writeCountDownLatch = new CountDownLatch(1);
> CountDownLatch readCountDownLatch = new CountDownLatch(1);
> boolean writeThreadStopFlag = false;
> boolean readThreadStopFlag = false;
> final Long combiId = 123l;
> Task writeTask = new Task(ignite, writeCountDownLatch) {
> @Override
> protected void invoke() {
> int count = 1;
> while (!writeThreadStopFlag && count <= 100) {
> try (Transaction transaction = ignite.transactions().txStart(
> TransactionConcurrency.PESSIMISTIC,
> TransactionIsolation.REPEATABLE_READ)) {
> for (int i = 0; i < 1000; i++) {
> Position position = new Position(combiId, 10000 + i, "000001", BigDecimal.
> valueOf(1000+count));
> cache.put(position.getKey(), position);
> }
> transaction.commit();
> log.info("{}write success",count);
> } catch (Exception e) {
> log.error("write error", e);
> }
> count++;
> }
> countDownLatch.countDown();
> }
> };
> Task.start(writeTask);
> ScanQuery<String, Position> scanQuery = new ScanQuery<>((k, v) -> combiId.
> equals(v.getCombiId()));
> Task readTask = new Task(ignite, readCountDownLatch) {
> @Override
> protected void invoke() {
> int count = 1;
> int tryTimes = 300;
> int unmatchCount = 0;
> while (!readThreadStopFlag && count <= tryTimes) {
> List<Position> positionDetails = new ArrayList<>();
> IgniteCache<String, Position> cache = ignite.cache(cacheName);
> try (QueryCursor<Cache.Entry<String, Position>> cursor = cache.query(
> scanQuery)) {
> List<Cache.Entry<String, Position>> list = cursor.getAll();
> list.forEach(x -> positionDetails.add(x.getValue()));
> }
> if (positionDetails.size() > 0) {
> BigDecimal currentQuantity = positionDetails.get(0).getQuantity();
> List<Position> unmatchDetails = positionDetails.stream()
> .filter(x -> !currentQuantity.equals(x.getQuantity())).collect(Collectors.
> toList());
> if (unmatchDetails.isEmpty()) {
> log.info("{}query success", count);
> } else {
> log.error("{} query,validate error,Quantity[{},{}] inconsistent,
> inconsistent size:{}", count, currentQuantity,
> unmatchDetails.stream().map(x -> x.getQuantity()).distinct().collect(
> Collectors.toList()),
> unmatchDetails.size());
> unmatchCount++;
> }
> }
> try {
> if (writeCountDownLatch.await(50, TimeUnit.MILLISECONDS)) {
> break;
> }
> } catch (InterruptedException e) {
> log.error("query error", e);
> }
> count++;
> }
> countDownLatch.countDown();
> }
> };
> Task.start(readTask);
> }
> }
>
>
>

Reply via email to