This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this 
push:
     new 6ff8e032e9 IGNITE-17967 RO writeIntent resolution tests hang up in 
case of multi node cluster (#1255)
6ff8e032e9 is described below

commit 6ff8e032e9c80630e667f043805212e3299adcaf
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Oct 28 17:30:21 2022 +0300

    IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node 
cluster (#1255)
    
    (cherry picked from commit 7d4bf9747dace1471214aea56caa3148b76a20eb)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |   3 +-
 .../replicator/PartitionReplicaListener.java       | 240 +++++++++++++--------
 .../ignite/internal/table/TxAbstractTest.java      |  10 +-
 .../apache/ignite/internal/table/TxLocalTest.java  |  15 +-
 .../table/impl/DummyInternalTableImpl.java         |  28 ++-
 5 files changed, 183 insertions(+), 113 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index b3fdf34019..8491b3ee99 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -141,7 +141,8 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Transaction manager. */
     private final TxManager txManager;
 
-    private HybridClock clock;
+    /** Clock. */
+    private final HybridClock clock;
 
     /** Constructor. */
     public SqlQueryProcessor(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42334a4117..4953287a6a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -241,25 +241,33 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     } else if (request instanceof 
ReadWriteMultiRowReplicaRequest) {
                         return 
processMultiEntryAction((ReadWriteMultiRowReplicaRequest) request);
                     } else if (request instanceof 
ReadWriteSwapRowReplicaRequest) {
-                        return 
processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request);
+                        return 
processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof 
ReadWriteScanRetrieveBatchReplicaRequest) {
-                        return 
processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) 
request);
+                        return 
processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) 
request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof 
ReadWriteScanCloseReplicaRequest) {
                         
processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
 
                         return completedFuture(null);
                     } else if (request instanceof TxFinishReplicaRequest) {
-                        return processTxFinishAction((TxFinishReplicaRequest) 
request);
+                        return processTxFinishAction((TxFinishReplicaRequest) 
request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof TxCleanupReplicaRequest) {
-                        return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                        return 
processTxCleanupAction((TxCleanupReplicaRequest) request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
-                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, 
isPrimary);
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, 
isPrimary)
+                                .thenApply(Function.identity());
                     } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
-                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, 
isPrimary);
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, 
isPrimary)
+                                .thenApply(Function.identity());
                     } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
-                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request, isPrimary);
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request, isPrimary)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReplicaSafeTimeSyncRequest) {
-                        return 
processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
+                        return 
processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request)
+                                .thenApply(Function.identity());
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
@@ -327,7 +335,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(
+    private CompletableFuture<ArrayList<BinaryRow>> 
processReadOnlyScanRetrieveBatchAction(
             ReadOnlyScanRetrieveBatchReplicaRequest request,
             Boolean isPrimary
     ) {
@@ -339,24 +347,59 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+        CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(readTimestamp);
+
+        return safeReadFuture.thenCompose(unused -> 
retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
+    }
 
+    /**
+     * Extracts exact amount of entries, or less if cursor is become empty, 
from a cursor on the specific time.
+     *
+     * @param readTimestamp Timestamp of the moment when that moment when the 
data will be extracted.
+     * @param cursorId Cursor id.
+     * @param count Amount of entries which sill be extracted.
+     * @return Result future.
+     */
+    private CompletableFuture<ArrayList<BinaryRow>> 
retrieveExactEntriesUntilCursorEmpty(
+            HybridTimestamp readTimestamp,
+            IgniteUuid cursorId,
+            int count
+    ) {
         @SuppressWarnings("resource") PartitionTimestampCursor cursor = 
cursors.computeIfAbsent(cursorId,
                 id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
 
-        CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(readTimestamp);
+        ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new 
ArrayList<>(count);
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated 
thread pool should be used.
-        return safeReadFuture.thenApplyAsync(ignored -> {
-            while (batchRows.size() < batchCount && cursor.hasNext()) {
-                BinaryRow resolvedReadResult = 
resolveReadResult(cursor.next(), readTimestamp, () -> 
cursor.committed(readTimestamp));
+        while (resolutionFuts.size() < count && cursor.hasNext()) {
+            ReadResult readResult = cursor.next();
+            HybridTimestamp newestCommitTimestamp = 
readResult.newestCommitTimestamp();
+
+            BinaryRow candidate =
+                    newestCommitTimestamp == null || 
!readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp);
+
+            resolutionFuts.add(resolveReadResult(readResult, readTimestamp, () 
-> candidate));
+        }
+
+        return allOf(resolutionFuts.toArray(new 
CompletableFuture[0])).thenCompose(unused -> {
+            ArrayList<BinaryRow> rows = new ArrayList<>(count);
+
+            for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+                BinaryRow resolvedReadResult = resolutionFut.join();
 
                 if (resolvedReadResult != null) {
-                    batchRows.add(resolvedReadResult);
+                    rows.add(resolvedReadResult);
                 }
             }
 
-            return batchRows;
+            if (rows.size() < count && cursor.hasNext()) {
+                return retrieveExactEntriesUntilCursorEmpty(readTimestamp, 
cursorId, count - rows.size()).thenApply(binaryRows -> {
+                    rows.addAll(binaryRows);
+
+                    return rows;
+                });
+            } else {
+                return completedFuture(rows);
+            }
         });
     }
 
@@ -367,8 +410,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, 
Boolean isPrimary) {
-        BinaryRow tableRow = request.binaryRow();
+    private CompletableFuture<BinaryRow> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, 
Boolean isPrimary) {
+        BinaryRow searchRow = request.binaryRow();
         HybridTimestamp readTimestamp = request.readTimestamp();
 
         if (request.requestType() != RequestType.RO_GET) {
@@ -378,11 +421,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated 
thread pool should be used.
-        return safeReadFuture.thenApplyAsync(ignored -> {
-            //TODO: IGNITE-17868 Integrate indexes into rowIds resolution 
along with proper lock management on search rows.
-            return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) 
-> binaryRow);
-        });
+        return safeReadFuture.thenCompose(unused -> resolveRowByPk(searchRow, 
readTimestamp));
     }
 
     /**
@@ -392,7 +431,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request, Boolean 
isPrimary) {
+    private CompletableFuture<ArrayList<BinaryRow>> 
processReadOnlyMultiEntryAction(
+            ReadOnlyMultiRowReplicaRequest request,
+            Boolean isPrimary
+    ) {
+        Collection<BinaryRow> searchRows = request.binaryRows();
+        HybridTimestamp readTimestamp = request.readTimestamp();
+
         if (request.requestType() != RequestType.RO_GET_ALL) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     format("Unknown single request [actionType={}]", 
request.requestType()));
@@ -400,17 +445,28 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated 
thread pool should be used.
-        return safeReadFuture.thenApplyAsync(ignored -> {
-            ArrayList<BinaryRow> result = new 
ArrayList<>(request.binaryRows().size());
+        return safeReadFuture.thenCompose(unused -> {
+            ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new 
ArrayList<>(searchRows.size());
 
-            for (BinaryRow searchRow : request.binaryRows()) {
-                BinaryRow row = resolveRowByPk(searchRow, 
request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+            for (BinaryRow searchRow : searchRows) {
+                CompletableFuture<BinaryRow> fut = resolveRowByPk(searchRow, 
readTimestamp);
 
-                result.add(row);
+                resolutionFuts.add(fut);
             }
 
-            return result;
+            return allOf(resolutionFuts.toArray(new 
CompletableFuture[0])).thenApply(unused1 -> {
+                ArrayList<BinaryRow> result = new 
ArrayList<>(resolutionFuts.size());
+
+                for (CompletableFuture<BinaryRow> resolutionFut : 
resolutionFuts) {
+                    BinaryRow resolvedReadResult = resolutionFut.join();
+
+                    if (resolvedReadResult != null) {
+                        result.add(resolvedReadResult);
+                    }
+                }
+
+                return result;
+            });
         });
     }
 
@@ -420,7 +476,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Request.
      * @return Future.
      */
-    private CompletionStage<Object> 
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+    private CompletionStage<Void> 
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
         return raftClient.run(new SafeTimeSyncCommand());
     }
 
@@ -489,7 +545,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Scan retrieve batch request operation.
      * @return Listener response.
      */
-    private CompletableFuture<Object> 
processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest 
request) {
+    private CompletableFuture<ArrayList<BinaryRow>> 
processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest 
request) {
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
 
@@ -525,7 +581,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return future result of the operation.
      */
     // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
-    private CompletableFuture<Object> 
processTxFinishAction(TxFinishReplicaRequest request) {
+    private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest request) {
         List<ReplicationGroupId> aggregatedGroupIds = 
request.groups().values().stream()
                 
.flatMap(List::stream).map(IgniteBiTuple::get1).collect(Collectors.toList());
 
@@ -552,7 +608,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         )
         );
 
-        return allOf(cleanupFutures).thenApply(ignored -> null);
+        return allOf(cleanupFutures);
     }
 
     /**
@@ -601,7 +657,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return CompletableFuture of void.
      */
     // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
-    private CompletableFuture processTxCleanupAction(TxCleanupReplicaRequest 
request) {
+    private CompletableFuture<Void> 
processTxCleanupAction(TxCleanupReplicaRequest request) {
         try {
             closeAllTransactionCursors(request.txId());
         } catch (Exception e) {
@@ -613,50 +669,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .thenRun(() -> 
lockManager.locks(request.txId()).forEachRemaining(lockManager::release));
     }
 
-    /**
-     * Finds the row and its identifier by given pk search row.
-     *
-     * @param tableRow A bytes representing a primary tableRow.
-     * @param ts A timestamp regarding which we need to resolve the given row.
-     * @param action An action to perform on a resolved row.
-     * @param <T> A type of the value returned by action.
-     * @return Result of the given action.
-     */
-    private <T> T resolveRowByPk(
-            BinaryRow tableRow,
-            HybridTimestamp ts,
-            BiFunction<@Nullable RowId, @Nullable BinaryRow, T> action
-    ) {
-        try (Cursor<RowId> cursor = pkIndexStorage.get().get(tableRow)) {
-            for (RowId rowId : cursor) {
-                ReadResult readResult = mvDataStorage.read(rowId, ts);
-
-                BinaryRow row = resolveReadResult(readResult, ts, () -> {
-                    if (readResult.newestCommitTimestamp() == null) {
-                        return null;
-                    }
-
-                    ReadResult committedReadResult = mvDataStorage.read(rowId, 
readResult.newestCommitTimestamp());
-
-                    assert !committedReadResult.isWriteIntent() :
-                            "The result is not committed [rowId=" + rowId + ", 
timestamp="
-                                    + readResult.newestCommitTimestamp() + ']';
-
-                    return committedReadResult.binaryRow();
-                });
-
-                if (row != null && row.hasValue()) {
-                    return action.apply(rowId, row);
-                }
-            }
-
-            return action.apply(null, null);
-        } catch (Exception e) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                    format("Unable to close cursor [tableId={}]", tableId), e);
-        }
-    }
-
     /**
      * Finds the row and its identifier by given pk search row.
      *
@@ -694,6 +706,42 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
+    /**
+     * Finds the row and its identifier by given pk search row.
+     *
+     * @param searchKey A bytes representing a primary key.
+     * @param ts A timestamp regarding which we need to resolve the given row.
+     * @return Result of the given action.
+     */
+    private CompletableFuture<BinaryRow> resolveRowByPk(BinaryRow searchKey, 
HybridTimestamp ts) {
+        try (Cursor<RowId> cursor = pkIndexStorage.get().get(searchKey)) {
+            for (RowId rowId : cursor) {
+                ReadResult readResult = mvDataStorage.read(rowId, ts);
+
+                return resolveReadResult(readResult, ts, () -> {
+                    HybridTimestamp newestCommitTimestamp = 
readResult.newestCommitTimestamp();
+
+                    if (newestCommitTimestamp == null) {
+                        return null;
+                    }
+
+                    ReadResult committedReadResult = mvDataStorage.read(rowId, 
newestCommitTimestamp);
+
+                    assert !committedReadResult.isWriteIntent() :
+                            "The result is not committed [rowId=" + rowId + ", 
timestamp="
+                                    + newestCommitTimestamp + ']';
+
+                    return committedReadResult.binaryRow();
+                });
+            }
+
+            return completedFuture(null);
+        } catch (Exception e) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    format("Unable to close cursor [tableId={}]", tableId), e);
+        }
+    }
+
     /**
      * Tests row values for equality.
      *
@@ -1122,7 +1170,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             locks[idx++] = locker.locksForInsert(txId, tableRow, rowId);
         }
 
-        return CompletableFuture.allOf(locks);
+        return allOf(locks);
     }
 
     private CompletableFuture<?> takeRemoveLockOnIndexes(BinaryRow tableRow, 
RowId rowId, UUID txId) {
@@ -1139,7 +1187,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             locks[idx++] = locker.locksForRemove(txId, tableRow, rowId);
         }
 
-        return CompletableFuture.allOf(locks);
+        return allOf(locks);
     }
 
     /**
@@ -1193,7 +1241,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Two actions operation request.
      * @return Listener response.
      */
-    private CompletableFuture<Object> 
processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
+    private CompletableFuture<Boolean> 
processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
         BinaryRow newRow = request.binaryRow();
         BinaryRow expectedRow = request.oldBinaryRow();
         TablePartitionId commitPartitionId = request.commitPartitionId();
@@ -1278,7 +1326,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 if (expectedTerm.equals(currentTerm)) {
                                     return completedFuture(null);
                                 } else {
-                                    return CompletableFuture.failedFuture(new 
PrimaryReplicaMissException(expectedTerm, currentTerm));
+                                    return failedFuture(new 
PrimaryReplicaMissException(expectedTerm, currentTerm));
                                 }
                             }
                     );
@@ -1297,7 +1345,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Resolved binary row.
      */
     private BinaryRow resolveReadResult(ReadResult readResult, UUID txId) {
-        return resolveReadResult(readResult, txId, null, null);
+        // Here is a safety join (waiting of the future result), because the 
resolution for RW transaction cannot lead to a network request.
+        return resolveReadResult(readResult, txId, null, null).join();
     }
 
     /**
@@ -1306,10 +1355,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param readResult Read result to resolve.
      * @param timestamp Timestamp.
      * @param lastCommitted Action to get the latest committed row.
-     * @return Resolved binary row.
+     * @return Future to resolved binary row.
      */
-    private BinaryRow resolveReadResult(ReadResult readResult, HybridTimestamp 
timestamp, Supplier<BinaryRow> lastCommitted) {
-
+    private CompletableFuture<BinaryRow> resolveReadResult(
+            ReadResult readResult,
+            HybridTimestamp timestamp,
+            Supplier<BinaryRow> lastCommitted
+    ) {
         return resolveReadResult(readResult, null, timestamp, lastCommitted);
     }
 
@@ -1327,9 +1379,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param txId Nullable transaction id, should be provided if resolution 
is performed within the context of RW transaction.
      * @param timestamp Timestamp is used in RO transaction only.
      * @param lastCommitted Action to get the latest committed row, it is used 
in RO transaction only.
-     * @return Resolved binary row.
+     * @return Future to resolved binary row.
      */
-    private BinaryRow resolveReadResult(
+    private CompletableFuture<BinaryRow> resolveReadResult(
             ReadResult readResult,
             @Nullable UUID txId,
             @Nullable HybridTimestamp timestamp,
@@ -1344,7 +1396,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                 if (retrievedResultTxId == null || 
txId.equals(retrievedResultTxId)) {
                     // Same transaction - return retrieved value. It may be 
both writeIntent or regular value.
-                    return readResult.binaryRow();
+                    return completedFuture(readResult.binaryRow());
                 } else {
                     // Should never happen, currently, locks prevent reading 
another transaction intents during RW requests.
                     throw new AssertionError("Mismatched transaction id, 
expectedTxId={" + txId + "},"
@@ -1352,14 +1404,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
             } else {
                 if (!readResult.isWriteIntent()) {
-                    return readResult.binaryRow();
+                    return completedFuture(readResult.binaryRow());
                 }
 
                 CompletableFuture<BinaryRow> writeIntentResolutionFut = 
resolveWriteIntentAsync(
                         readResult, timestamp, lastCommitted);
 
                 // RO request.
-                return writeIntentResolutionFut.join();
+                return writeIntentResolutionFut;
             }
         }
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 0d9cc2d77b..844dab2eb5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -90,7 +90,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
     );
 
     /** Table ID test value. */
-    public static final UUID tableId2 = java.util.UUID.randomUUID();
+    public static final UUID tableId2 = UUID.randomUUID();
 
     protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
             1,
@@ -1798,7 +1798,6 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         assertEquals(100., accounts.recordView().get(readOnlyTx, 
makeKey(1)).doubleValue("balance"));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967";)
     @Test
     public void testReadOnlyGetWriteIntentResolutionUpdate() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1822,7 +1821,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         assertEquals(300., accounts.recordView().get(readOnlyTx2, 
makeKey(1)).doubleValue("balance"));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967, 
https://issues.apache.org/jira/browse/IGNITE-17968";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17968";)
     @Test
     public void testReadOnlyGetWriteIntentResolutionRemove() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1857,7 +1856,6 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         validateBalance(retrievedKeys, 100., 200.);
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967";)
     // TODO: IGNITE-17968 Remove after fix.
     @Test
     public void testReadOnlyPendingWriteIntentSkipped() {
@@ -1876,14 +1874,14 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         tx.commit();
 
         Collection<Tuple> retrievedKeys2 = 
accounts.recordView().getAll(readOnlyTx, List.of(makeKey(1), makeKey(2)));
-        validateBalance(retrievedKeys2, 100., 300.);
+        validateBalance(retrievedKeys2, 100., 200.);
 
         Transaction readOnlyTx2 = igniteTransactions.readOnly().begin();
         Collection<Tuple> retrievedKeys3 = 
accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
         validateBalance(retrievedKeys3, 100., 300.);
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967, 
https://issues.apache.org/jira/browse/IGNITE-17968";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17968";)
     @Test
     public void testReadOnlyPendingWriteIntentSkippedCombined() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 0c6a57dce7..9809044912 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.tx.LockManager;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.Table;
@@ -64,6 +67,7 @@ public class TxLocalTest extends TxAbstractTest {
         lockManager = new HeapLockManager();
 
         ReplicaService replicaSvc = mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
+        PlacementDriver placementDriver = mock(PlacementDriver.class, 
RETURNS_DEEP_STUBS);
 
         Map<ReplicationGroupId, DummyInternalTableImpl> tables = new 
HashMap<>();
 
@@ -77,15 +81,22 @@ public class TxLocalTest extends TxAbstractTest {
             }
         ).when(replicaSvc).invoke(any(), any());
 
+        doAnswer(invocationOnMock -> {
+            TxStateReplicaRequest request = invocationOnMock.getArgument(1);
+
+            return CompletableFuture.completedFuture(
+                    
tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
+        }).when(placementDriver).sendMetaRequest(any(), any());
+
         txManager = new TxManagerImpl(replicaSvc, lockManager, new 
HybridClockImpl());
 
         igniteTransactions = new IgniteTransactionsImpl(txManager);
 
-        DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, 
txManager, true);
+        DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, 
txManager, true, placementDriver);
 
         accounts = new TableImpl(table, new 
DummySchemaManagerImpl(ACCOUNTS_SCHEMA), lockManager);
 
-        DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, 
txManager, true);
+        DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, 
txManager, true, placementDriver);
 
         customers = new TableImpl(table2, new 
DummySchemaManagerImpl(CUSTOMERS_SCHEMA), lockManager);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3f202bb5f0..4cff2aea18 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -50,14 +50,14 @@ import 
org.apache.ignite.internal.table.distributed.IndexLocker;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
-import 
org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateTableStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -104,9 +104,15 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      * @param txManager Transaction manager.
      * @param crossTableUsage If this dummy table is going to be used in 
cross-table tests, it won't mock the calls of ReplicaService
      *                        by itself.
+     * @param placementDriver Placement driver.
      */
-    public DummyInternalTableImpl(ReplicaService replicaSvc, TxManager 
txManager, boolean crossTableUsage) {
-        this(replicaSvc, new TestMvPartitionStorage(0), txManager, 
crossTableUsage);
+    public DummyInternalTableImpl(
+            ReplicaService replicaSvc,
+            TxManager txManager,
+            boolean crossTableUsage,
+            PlacementDriver placementDriver
+    ) {
+        this(replicaSvc, new TestMvPartitionStorage(0), txManager, 
crossTableUsage, placementDriver);
     }
 
     /**
@@ -116,7 +122,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      * @param mvPartStorage Multi version partition storage.
      */
     public DummyInternalTableImpl(ReplicaService replicaSvc, 
MvPartitionStorage mvPartStorage) {
-        this(replicaSvc, mvPartStorage, null, false);
+        this(replicaSvc, mvPartStorage, null, false, null);
     }
 
     /**
@@ -127,12 +133,14 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      * @param txManager Transaction manager, if {@code null}, then default one 
will be created.
      * @param crossTableUsage If this dummy table is going to be used in 
cross-table tests, it won't mock the calls of ReplicaService
      *                        by itself.
+     * @param placementDriver Placement driver.
      */
     public DummyInternalTableImpl(
             ReplicaService replicaSvc,
             MvPartitionStorage mvPartStorage,
             @Nullable TxManager txManager,
-            boolean crossTableUsage
+            boolean crossTableUsage,
+            PlacementDriver placementDriver
     ) {
         super(
                 "test",
@@ -143,7 +151,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 addr -> Mockito.mock(ClusterNode.class),
                 txManager == null ? new TxManagerImpl(replicaSvc, new 
HeapLockManager(), new HybridClockImpl()) : txManager,
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestConcurrentHashMapTxStateTableStorage(),
                 replicaSvc,
                 new HybridClockImpl()
         );
@@ -242,15 +250,15 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 pkStorage,
                 clock,
                 new PendingComparableValuesTracker<>(clock.now()),
+                txStateStorage().getOrCreateTxStateStorage(0),
                 null,
-                null,
-                null,
+                placementDriver,
                 peer -> true
         );
 
         partitionListener = new PartitionListener(
                 mvPartStorage,
-                new TestConcurrentHashMapTxStateStorage(),
+                txStateStorage().getOrCreateTxStateStorage(0),
                 this.txManager,
                 () -> Map.of(pkStorage.get().id(), pkStorage.get())
         );


Reply via email to