This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a1dd09170 IGNITE-19113 Refactoring after implementing index building 
(#1858)
9a1dd09170 is described below

commit 9a1dd09170a856b9858844ad62c3237bb0aa7a43
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Mar 29 17:59:37 2023 +0300

    IGNITE-19113 Refactoring after implementing index building (#1858)
---
 .../ignite/raft/jraft/disruptor/StripedDisruptor.java      |  4 +++-
 .../distributed/ItAbstractInternalTableScanTest.java       |  7 ++++---
 .../distributed/ItInternalTableReadOnlyOperationsTest.java |  4 ++++
 .../distributed/ItInternalTableReadOnlyScanTest.java       |  7 +------
 .../distributed/ItInternalTableReadWriteScanTest.java      |  2 --
 .../table/distributed/raft/PartitionDataStorage.java       | 11 +++++++++++
 .../internal/table/distributed/raft/PartitionListener.java | 14 ++++++--------
 .../outgoing/SnapshotAwarePartitionDataStorage.java        |  6 ++++++
 .../ignite/distributed/TestPartitionDataStorage.java       |  6 ++++++
 9 files changed, 41 insertions(+), 20 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 48302edaf8..2c2665859e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -269,7 +269,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
 
         /** {@inheritDoc} */
         @Override public void handleEventException(Throwable ex, long 
sequence, T event) {
-            BiConsumer<T, Throwable> handler = subscribers.get(event.nodeId());
+            NodeId nodeId = event.nodeId();
+
+            BiConsumer<T, Throwable> handler = nodeId == null ? null : 
subscribers.get(nodeId);
 
             LOG.error("Handle disruptor event error [name={}, event={}, 
hasHandler={}]", ex, name, event, handler != null);
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 27faf4d3ac..ef94a092a0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -66,7 +66,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
@@ -87,14 +86,16 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
     /** Internal table to test. */
     DummyInternalTableImpl internalTbl;
 
-    private final HybridClock clock = new HybridClockImpl();
+    final HybridClock clock = new HybridClockImpl();
 
     /**
      * Prepare test environment using DummyInternalTableImpl and Mocked 
storage.
      */
     @BeforeEach
     public void setUp(TestInfo testInfo) {
-        internalTbl = new 
DummyInternalTableImpl(Mockito.mock(ReplicaService.class), mockStorage, 
ROW_SCHEMA);
+        
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
+
+        internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class), 
mockStorage, ROW_SCHEMA);
     }
 
     /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 8830ea03c7..576e1f9a4b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -47,6 +48,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.table.InternalTable;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
@@ -101,6 +103,8 @@ public class ItInternalTableReadOnlyOperationsTest extends 
IgniteAbstractTest {
      */
     @BeforeEach
     public void setUp(TestInfo testInfo) {
+        
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
+
         internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, 
SCHEMA);
 
         lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 15433e7254..0e2a5bf2c3 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.distributed;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.Flow.Publisher;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -35,12 +33,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
  */
 @ExtendWith(MockitoExtension.class)
 public class ItInternalTableReadOnlyScanTest extends 
ItAbstractInternalTableScanTest {
-    private static final HybridClock CLOCK = new HybridClockImpl();
-
-    /** {@inheritDoc} */
     @Override
     protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
-        return internalTbl.scan(part, CLOCK.now(), mock(ClusterNode.class));
+        return internalTbl.scan(part, clock.now(), mock(ClusterNode.class));
     }
 
     // TODO: IGNITE-17666 Use super test as is.
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 3f1fd642cc..26581536bd 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.network.ClusterNode;
  * Tests for {@link InternalTable#scan(int, 
org.apache.ignite.internal.tx.InternalTransaction)}.
  */
 public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableScanTest {
-    /** {@inheritDoc} */
     @Override
     protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
         if (tx == null) {
@@ -45,7 +44,6 @@ public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableSca
         return internalTbl.scan(part, tx.id(), recipient, null, null, null, 0, 
null);
     }
 
-    /** {@inheritDoc} */
     @Override
     protected InternalTransaction startTx() {
         InternalTransaction tx = internalTbl.txManager().begin();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index aa6347bb89..4d2fc32f17 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -186,4 +187,14 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      */
     @Override
     void close();
+
+    /**
+     * Scans the partition and returns a cursor of values at the given 
timestamp. This cursor filters out committed tombstones, but not
+     * tombstones in the write-intent state.
+     *
+     * @param timestamp Timestamp. Can't be {@code null}.
+     * @return Cursor.
+     * @throws StorageException If failed to read data from the storage.
+     */
+    PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException;
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 8b0534a188..04c6fb16e8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -106,14 +106,12 @@ public class PartitionListener implements 
RaftGroupListener {
         this.storageIndexTracker = storageIndexTracker;
 
         // TODO: IGNITE-18502 Implement a pending update storage
-        try (PartitionTimestampCursor cursor = 
partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
-            if (cursor != null) {
-                while (cursor.hasNext()) {
-                    ReadResult readResult = cursor.next();
-
-                    if (readResult.isWriteIntent()) {
-                        
txsPendingRowIds.computeIfAbsent(readResult.transactionId(), key -> new 
HashSet()).add(readResult.rowId());
-                    }
+        try (PartitionTimestampCursor cursor = 
partitionDataStorage.scan(HybridTimestamp.MAX_VALUE)) {
+            while (cursor.hasNext()) {
+                ReadResult readResult = cursor.next();
+
+                if (readResult.isWriteIntent()) {
+                    
txsPendingRowIds.computeIfAbsent(readResult.transactionId(), key -> new 
HashSet<>()).add(readResult.rowId());
                 }
             }
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 28161e3c0a..458a3dae26 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -196,4 +197,9 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
     public MvPartitionStorage getStorage() {
         return partitionStorage;
     }
+
+    @Override
+    public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException {
+        return partitionStorage.scan(timestamp);
+    }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 6ae7e89a24..cffd10c270 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -125,4 +126,9 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
     @Override
     public void close() {
     }
+
+    @Override
+    public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException {
+        return partitionStorage.scan(timestamp);
+    }
 }

Reply via email to