This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9a1dd09170 IGNITE-19113 Refactoring after implementing index building
(#1858)
9a1dd09170 is described below
commit 9a1dd09170a856b9858844ad62c3237bb0aa7a43
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Mar 29 17:59:37 2023 +0300
IGNITE-19113 Refactoring after implementing index building (#1858)
---
.../ignite/raft/jraft/disruptor/StripedDisruptor.java | 4 +++-
.../distributed/ItAbstractInternalTableScanTest.java | 7 ++++---
.../distributed/ItInternalTableReadOnlyOperationsTest.java | 4 ++++
.../distributed/ItInternalTableReadOnlyScanTest.java | 7 +------
.../distributed/ItInternalTableReadWriteScanTest.java | 2 --
.../table/distributed/raft/PartitionDataStorage.java | 11 +++++++++++
.../internal/table/distributed/raft/PartitionListener.java | 14 ++++++--------
.../outgoing/SnapshotAwarePartitionDataStorage.java | 6 ++++++
.../ignite/distributed/TestPartitionDataStorage.java | 6 ++++++
9 files changed, 41 insertions(+), 20 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 48302edaf8..2c2665859e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -269,7 +269,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
/** {@inheritDoc} */
@Override public void handleEventException(Throwable ex, long
sequence, T event) {
- BiConsumer<T, Throwable> handler = subscribers.get(event.nodeId());
+ NodeId nodeId = event.nodeId();
+
+ BiConsumer<T, Throwable> handler = nodeId == null ? null :
subscribers.get(nodeId);
LOG.error("Handle disruptor event error [name={}, event={},
hasHandler={}]", ex, name, event, handler != null);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 27faf4d3ac..ef94a092a0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -66,7 +66,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
/**
@@ -87,14 +86,16 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
/** Internal table to test. */
DummyInternalTableImpl internalTbl;
- private final HybridClock clock = new HybridClockImpl();
+ final HybridClock clock = new HybridClockImpl();
/**
* Prepare test environment using DummyInternalTableImpl and Mocked
storage.
*/
@BeforeEach
public void setUp(TestInfo testInfo) {
- internalTbl = new
DummyInternalTableImpl(Mockito.mock(ReplicaService.class), mockStorage,
ROW_SCHEMA);
+
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
+
+ internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class),
mockStorage, ROW_SCHEMA);
}
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 8830ea03c7..576e1f9a4b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -47,6 +48,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.table.InternalTable;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
@@ -101,6 +103,8 @@ public class ItInternalTableReadOnlyOperationsTest extends
IgniteAbstractTest {
*/
@BeforeEach
public void setUp(TestInfo testInfo) {
+
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
+
internalTbl = new DummyInternalTableImpl(replicaService, mockStorage,
SCHEMA);
lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 15433e7254..0e2a5bf2c3 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.distributed;
import static org.mockito.Mockito.mock;
import java.util.concurrent.Flow.Publisher;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -35,12 +33,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public class ItInternalTableReadOnlyScanTest extends
ItAbstractInternalTableScanTest {
- private static final HybridClock CLOCK = new HybridClockImpl();
-
- /** {@inheritDoc} */
@Override
protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
- return internalTbl.scan(part, CLOCK.now(), mock(ClusterNode.class));
+ return internalTbl.scan(part, clock.now(), mock(ClusterNode.class));
}
// TODO: IGNITE-17666 Use super test as is.
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 3f1fd642cc..26581536bd 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.network.ClusterNode;
* Tests for {@link InternalTable#scan(int,
org.apache.ignite.internal.tx.InternalTransaction)}.
*/
public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableScanTest {
- /** {@inheritDoc} */
@Override
protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
if (tx == null) {
@@ -45,7 +44,6 @@ public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableSca
return internalTbl.scan(part, tx.id(), recipient, null, null, null, 0,
null);
}
- /** {@inheritDoc} */
@Override
protected InternalTransaction startTx() {
InternalTransaction tx = internalTbl.txManager().begin();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index aa6347bb89..4d2fc32f17 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -186,4 +187,14 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
*/
@Override
void close();
+
+ /**
+ * Scans the partition and returns a cursor of values at the given
timestamp. This cursor filters out committed tombstones, but not
+ * tombstones in the write-intent state.
+ *
+ * @param timestamp Timestamp. Can't be {@code null}.
+ * @return Cursor.
+ * @throws StorageException If failed to read data from the storage.
+ */
+ PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 8b0534a188..04c6fb16e8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -106,14 +106,12 @@ public class PartitionListener implements
RaftGroupListener {
this.storageIndexTracker = storageIndexTracker;
// TODO: IGNITE-18502 Implement a pending update storage
- try (PartitionTimestampCursor cursor =
partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
- if (cursor != null) {
- while (cursor.hasNext()) {
- ReadResult readResult = cursor.next();
-
- if (readResult.isWriteIntent()) {
-
txsPendingRowIds.computeIfAbsent(readResult.transactionId(), key -> new
HashSet()).add(readResult.rowId());
- }
+ try (PartitionTimestampCursor cursor =
partitionDataStorage.scan(HybridTimestamp.MAX_VALUE)) {
+ while (cursor.hasNext()) {
+ ReadResult readResult = cursor.next();
+
+ if (readResult.isWriteIntent()) {
+
txsPendingRowIds.computeIfAbsent(readResult.transactionId(), key -> new
HashSet<>()).add(readResult.rowId());
}
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 28161e3c0a..458a3dae26 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -196,4 +197,9 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
public MvPartitionStorage getStorage() {
return partitionStorage;
}
+
+ @Override
+ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException {
+ return partitionStorage.scan(timestamp);
+ }
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 6ae7e89a24..cffd10c270 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -125,4 +126,9 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
@Override
public void close() {
}
+
+ @Override
+ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException {
+ return partitionStorage.scan(timestamp);
+ }
}