This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3812e3a IGNITE-15434 Reactive scan for table partitions.
3812e3a is described below
commit 3812e3a6543ba9f1ab62272b5a7967c2246d1949
Author: sanpwc <[email protected]>
AuthorDate: Wed Sep 29 19:36:43 2021 +0300
IGNITE-15434 Reactive scan for table partitions.
Signed-off-by: Andrey Gura <[email protected]>
---
.../ignite/client/fakes/FakeInternalTable.java | 8 +
.../server/raft/MetaStorageListener.java | 2 +-
.../distributed/ITInternalTableScanTest.java | 529 +++++++++++++++++++++
.../ignite/internal/table/InternalTable.java | 12 +-
.../command/response/MultiRowsResponse.java | 12 +-
.../distributed/command/scan/ScanCloseCommand.java | 46 ++
.../distributed/command/scan/ScanInitCommand.java | 59 +++
.../command/scan/ScanRetrieveBatchCommand.java | 60 +++
.../table/distributed/raft/PartitionListener.java | 135 ++++++
.../distributed/storage/InternalTableImpl.java | 183 +++++++
.../table/impl/DummyInternalTableImpl.java | 9 +
11 files changed, 1047 insertions(+), 8 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 18e5917..918cc46 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -22,8 +22,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.schema.definition.SchemaManagementMode;
import org.apache.ignite.tx.Transaction;
@@ -224,4 +227,9 @@ public class FakeInternalTable implements InternalTable {
return CompletableFuture.completedFuture(skipped);
}
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Publisher<BinaryRow> scan(int p, @Nullable
Transaction tx) {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
}
diff --git
a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 98652bb..f52aaff 100644
---
a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -281,7 +281,7 @@ public class MetaStorageListener implements
RaftGroupListener {
else if (clo.command() instanceof CursorCloseCommand) {
CursorCloseCommand cursorCloseCmd = (CursorCloseCommand)
clo.command();
- CursorMeta cursorDesc = cursors.get(cursorCloseCmd.cursorId());
+ CursorMeta cursorDesc =
cursors.remove(cursorCloseCmd.cursorId());
if (cursorDesc == null) {
clo.result(null);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
new file mode 100644
index 0000000..59a1113
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link InternalTable#scan(int, org.apache.ignite.tx.Transaction)}
+ */
+@ExtendWith(MockitoExtension.class)
+public class ITInternalTableScanTest {
+ /** */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new
TestScaleCubeClusterServiceFactory();
+
+ /** */
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY =
new MessageSerializationRegistryImpl();
+
+ /** */
+ private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
+
+ /** */
+ private static final String TEST_TABLE_NAME = "testTbl";
+
+ /** Mock partition storage. */
+ @Mock
+ private Storage mockStorage;
+
+ /** */
+ private ClusterService network;
+
+ /** */
+ private RaftServer raftSrv;
+
+ /** Internal table to test. */
+ private InternalTable internalTbl;
+
+ /**
+ * Prepare test environment:
+ * <ol>
+ * <li>Start network node.</li>
+ * <li>Start raft server.</li>
+ * <li>Prepare partitioned raft group.</li>
+ * <li>Prepare partitioned raft group service.</li>
+ * <li>Prepare internal table as a test object.</li>
+ * </ol>
+ *
+ * @throws Exception If any.
+ */
+ @BeforeEach
+ public void setUp(TestInfo testInfo) throws Exception {
+ NetworkAddress nodeNetworkAddress = new NetworkAddress("localhost",
20_000);
+
+ network = ClusterServiceTestUtils.clusterService(
+ testInfo,
+ 20_000,
+ new StaticNodeFinder(List.of(nodeNetworkAddress)),
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ );
+
+ network.start();
+
+ raftSrv = new RaftServerImpl(network, FACTORY);
+
+ raftSrv.start();
+
+ String grpName = "test_part_grp";
+
+ List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
+
+ mockStorage = mock(Storage.class);
+
+ raftSrv.startRaftGroup(
+ grpName,
+ new PartitionListener(mockStorage),
+ conf
+ );
+
+ RaftGroupService raftGrpSvc = RaftGroupServiceImpl.start(
+ grpName,
+ network,
+ FACTORY,
+ 10_000,
+ conf,
+ true,
+ 200
+ ).get(3, TimeUnit.SECONDS);
+
+ internalTbl = new InternalTableImpl(
+ TEST_TABLE_NAME,
+ new IgniteUuidGenerator(UUID.randomUUID(), 0).randomUuid(),
+ Map.of(0, raftGrpSvc),
+ 1
+ );
+ }
+
+ /**
+ * Cleanup previously started network and raft server.
+ *
+ * @throws Exception If failed to stop component.
+ */
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (raftSrv != null)
+ raftSrv.beforeNodeStop();
+
+ if (network != null)
+ network.beforeNodeStop();
+
+ if (raftSrv != null)
+ raftSrv.stop();
+
+ if (network != null)
+ network.stop();
+ }
+
+ /**
+ * Checks whether publisher provides all existing data and then completes
if requested by one row at a time.
+ */
+ @Test
+ public void testOneRowScan() throws Exception {
+ requestNTest(
+ List.of(
+ prepareDataRow("key1", "val1"),
+ prepareDataRow("key2", "val2")
+ ),
+ 1);
+ }
+
+ /**
+ * Checks whether publisher provides all existing data and then completes
if requested by multiple rows at a time.
+ */
+ @Test
+ public void testMultipleRowScan() throws Exception {
+ requestNTest(
+ List.of(
+ prepareDataRow("key1", "val1"),
+ prepareDataRow("key2", "val2"),
+ prepareDataRow("key3", "val3"),
+ prepareDataRow("key4", "val4"),
+ prepareDataRow("key5", "val5")
+ ),
+ 2);
+ }
+
+ /**
+ * Checks whether {@link IllegalArgumentException} is thrown and inner
storage cursor is closes in case of invalid
+ * requested amount of items.
+ *
+ * @throws Exception If any.
+ */
+ @Test()
+ public void testInvalidRequestedAmountScan() throws Exception {
+ AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+ when(mockStorage.scan(any())).thenAnswer(invocation -> {
+ var cursor = mock(Cursor.class);
+
+ doAnswer(
+ invocationClose -> {
+ cursorClosed.set(true);
+ return null;
+ }
+ ).when(cursor).close();
+
+ when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+ throw new StorageException("test");
+ });
+
+ return cursor;
+ });
+
+ for (long n : new long[] {-1, 0}) {
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ cursorClosed.set(false);
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+ @Override public void onSubscribe(Subscription subscription) {
+ subscription.request(n);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null,
1_000));
+
+ assertTrue(waitForCondition(cursorClosed::get, 1_000));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ throw gotException.get();
+ }
+ );
+ }
+ }
+
+ /**
+ * Checks that exception from storage cursors has next properly propagates
to subscriber.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581")
+ @Test
+ public void testExceptionRowScanCursorHasNext() throws Exception {
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+ when(mockStorage.scan(any())).thenAnswer(invocation -> {
+ var cursor = mock(Cursor.class);
+
+ when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+ throw new StorageException("test");
+ });
+
+ doAnswer(
+ invocationClose -> {
+ cursorClosed.set(true);
+ return null;
+ }
+ ).when(cursor).close();
+
+ return cursor;
+ });
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+ @Override public void onSubscribe(Subscription subscription) {
+ subscription.request(1);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+ assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
+
+ assertTrue(waitForCondition(cursorClosed::get, 1_000));
+ }
+
+ /**
+ * Checks that exception from storage cursor creation properly propagates
to subscriber.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581")
+ @Test
+ public void testExceptionRowScan() throws Exception {
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ when(mockStorage.scan(any())).thenThrow(new StorageException("Some
storage exception"));
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+ @Override public void onSubscribe(Subscription subscription) {
+ subscription.request(1);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+ assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
+ }
+
+
+ /**
+ * Checks that {@link IllegalArgumentException} is thrown in case of
invalid partition.
+ */
+ @Test()
+ public void testInvalidPartitionParameterScan() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> internalTbl.scan(-1, null)
+ );
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> internalTbl.scan(1, null)
+ );
+ }
+
+ /**
+ * Checks that in case of second subscription {@link
IllegalStateException} will be fires to onError.
+ *
+ * @throws Exception If any.
+ */
+ @Test
+ public void testSecondSubscriptionFiresIllegalStateException() throws
Exception {
+ Flow.Publisher<BinaryRow> scan = internalTbl.scan(0, null);
+
+ scan.subscribe(new Subscriber<>() {
+ @Override public void onSubscribe(Subscription subscription) {
+
+ }
+
+ @Override public void onNext(BinaryRow item) {
+
+ }
+
+ @Override public void onError(Throwable throwable) {
+
+ }
+
+ @Override public void onComplete() {
+
+ }
+ });
+
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ scan.subscribe(new Subscriber<>() {
+ @Override public void onSubscribe(Subscription subscription) {
+
+ }
+
+ @Override public void onNext(BinaryRow item) {
+
+ }
+
+ @Override public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ }
+
+ @Override public void onComplete() {
+
+ }
+ });
+
+ assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+ assertEquals(gotException.get().getClass(),
IllegalStateException.class);
+ }
+
+ /**
+ * Checks that {@link NullPointerException} is thrown in case of null
subscription.
+ */
+ @Test
+ public void testNullPointerExceptionIsThrownInCaseOfNullSubscription() {
+ assertThrows(
+ NullPointerException.class,
+ () -> internalTbl.scan(0, null).subscribe(null)
+ );
+ }
+
+ /**
+ * Helper method to convert key and value to {@link DataRow}.
+ *
+ * @param entryKey Key.
+ * @param entryVal Value
+ * @return {@link DataRow} based on given key and value.
+ * @throws java.io.IOException If failed to close output stream that was
used to convertation.
+ */
+ private static @NotNull DataRow prepareDataRow(@NotNull String entryKey,
+ @NotNull String entryVal) throws IOException {
+ byte[] keyBytes = ByteUtils.toBytes(entryKey);
+
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
{
+ outputStream.write(keyBytes);
+ outputStream.write(ByteUtils.toBytes(entryVal));
+
+ return new SimpleDataRow(keyBytes, outputStream.toByteArray());
+ }
+ }
+
+ /**
+ * Checks whether publisher provides all existing data and then completes
if requested by reqAmount rows at a time.
+ *
+ * @param submittedItems Items to be pushed by ublisher.
+ * @param reqAmount Amount of rows to request at a time.
+ * @throws Exception If Any.
+ */
+ private void requestNTest(List<DataRow> submittedItems, int reqAmount)
throws Exception {
+ AtomicInteger cursorTouchCnt = new AtomicInteger(0);
+
+ List<BinaryRow> retrievedItems = Collections.synchronizedList(new
ArrayList<>());
+
+ when(mockStorage.scan(any())).thenAnswer(invocation -> {
+ var cursor = mock(Cursor.class);
+
+ when(cursor.hasNext()).thenAnswer(hnInvocation ->
cursorTouchCnt.get() < submittedItems.size());
+
+ when(cursor.next()).thenAnswer(nInvocation ->
submittedItems.get(cursorTouchCnt.getAndIncrement()));
+
+ return cursor;
+ });
+
+ AtomicBoolean noMoreData = new AtomicBoolean(false);
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+ private Subscription subscription;
+
+ @Override public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+
+ subscription.request(reqAmount);
+ }
+
+ @Override public void onNext(BinaryRow item) {
+ retrievedItems.add(item);
+
+ if (retrievedItems.size() % reqAmount == 0)
+ subscription.request(reqAmount);
+ }
+
+ @Override public void onError(Throwable throwable) {
+ fail("onError call is not expected.");
+ }
+
+ @Override public void onComplete() {
+ noMoreData.set(true);
+ }
+ });
+
+ assertTrue(waitForCondition(() -> retrievedItems.size() ==
submittedItems.size(), 2_000));
+
+ List<byte[]> expItems =
submittedItems.stream().map(DataRow::valueBytes).collect(Collectors.toList());
+ List<byte[]> gotItems =
retrievedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList());
+
+ for (int i = 0; i < expItems.size(); i++)
+ assertTrue(Arrays.equals(expItems.get(i), gotItems.get(i)));
+
+ assertTrue(noMoreData.get(), "More data is not expected.");
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index f6faafb..6466769 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.schema.definition.SchemaManagementMode;
@@ -195,5 +196,14 @@ public interface InternalTable {
CompletableFuture<Collection<BinaryRow>>
deleteAllExact(Collection<BinaryRow> rows,
@Nullable Transaction tx);
- //TODO: IGNTIE-14488. Add invoke() methods.
+ /**
+ * Scans given partition, providing {@link Publisher<BinaryRow>} that
reactively notifies about partition rows.
+ *
+ * @param p The partition.
+ * @param tx The transaction.
+ * @return {@link Publisher<BinaryRow>} that reactively notifies about
partition rows.
+ */
+ @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
+
+ //TODO: IGNITE-14488. Add invoke() methods.
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
index 7c9d969..2cd6097 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.table.distributed.command.response;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.distributed.command.CommandUtils;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
@@ -36,7 +36,7 @@ import
org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
*/
public class MultiRowsResponse implements Serializable {
/** Binary rows. */
- private transient Collection<BinaryRow> rows;
+ private transient List<BinaryRow> rows;
/*
* Row bytes.
@@ -50,7 +50,7 @@ public class MultiRowsResponse implements Serializable {
*
* @param rows Collection of binary rows.
*/
- public MultiRowsResponse(Collection<BinaryRow> rows) {
+ public MultiRowsResponse(List<BinaryRow> rows) {
this.rows = rows;
CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
@@ -59,9 +59,9 @@ public class MultiRowsResponse implements Serializable {
/**
* @return Binary rows.
*/
- public Collection<BinaryRow> getValues() {
+ public List<BinaryRow> getValues() {
if (rows == null && rowsBytes != null) {
- rows = new HashSet<>();
+ rows = new ArrayList<>();
CommandUtils.readRows(rowsBytes, rows::add);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
new file mode 100644
index 0000000..db3cd58
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.scan;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Scan close command for PartitionListener that closes scan with given id.
+ */
+public class ScanCloseCommand implements WriteCommand {
+ /** Id of scan that is associated with the current command. */
+ @NotNull private final IgniteUuid scanId;
+
+ /**
+ * The Constructor.
+ *
+ * @param scanId Id of scan that is associated with the current command.
+ */
+ public ScanCloseCommand(@NotNull IgniteUuid scanId) {
+ this.scanId = scanId;
+ }
+
+ /**
+ * @return Id of scan that is associated with the current command.
+ */
+ public @NotNull IgniteUuid scanId() {
+ return scanId;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
new file mode 100644
index 0000000..9cc397c
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.scan;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Scan init command for PartitionListener that prepares server-side scan for
further iteration over it.
+ */
+public class ScanInitCommand implements WriteCommand {
+ /** Id of the node that requests scan. */
+ @NotNull private final String requesterNodeId;
+
+ /** Id of scan that is associated with the current command. */
+ @NotNull private final IgniteUuid scanId;
+
+ /**
+ * @param requesterNodeId Id of the node that requests scan.
+ * @param scanId Id of scan that is associated with the current command.
+ */
+ public ScanInitCommand(
+ @NotNull String requesterNodeId,
+ @NotNull IgniteUuid scanId
+ ) {
+ this.requesterNodeId = requesterNodeId;
+ this.scanId = scanId;
+ }
+
+ /**
+ * @return Id of the node that requests scan.
+ */
+ public @NotNull String requesterNodeId() {
+ return requesterNodeId;
+ }
+
+ /**
+ * @return Id of scan that is associated with the current command.
+ */
+ @NotNull public IgniteUuid scanId() {
+ return scanId;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanRetrieveBatchCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanRetrieveBatchCommand.java
new file mode 100644
index 0000000..7cfd882
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanRetrieveBatchCommand.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.scan;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Scan retrieve batch command for PartitionListener that retrieves batch of
data from previously prepared server scan,
+ * see {@link ScanInitCommand} for more details.
+ */
+public class ScanRetrieveBatchCommand implements WriteCommand {
+ /** Amount of items to retrieve. */
+ private final int itemsToRetrieveCnt;
+
+ /** Id of scan that is associated with the current command. */
+ @NotNull private final IgniteUuid scanId;
+
+ /**
+ * @param itemsToRetrieveCnt Amount of items to retrieve.
+ * @param scanId Id of scan that is associated with the current command.
+ */
+ public ScanRetrieveBatchCommand(
+ int itemsToRetrieveCnt,
+ @NotNull IgniteUuid scanId
+ ) {
+ this.itemsToRetrieveCnt = itemsToRetrieveCnt;
+ this.scanId = scanId;
+ }
+
+ /**
+ * @return Amount of items to retrieve.
+ */
+ public int itemsToRetrieveCount() {
+ return itemsToRetrieveCnt;
+ }
+
+ /**
+ * @return Id of scan that is associated with the current command.
+ */
+ @NotNull public IgniteUuid scanId() {
+ return scanId;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index dc816a4..0301fb4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.table.distributed.raft;
import java.nio.ByteBuffer;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -29,6 +33,7 @@ import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
@@ -52,7 +57,12 @@ import
org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
@@ -68,6 +78,9 @@ public class PartitionListener implements RaftGroupListener {
/** Partition storage. */
private final Storage storage;
+ /** Cursors map. */
+ private final Map<IgniteUuid, CursorMeta> cursors;
+
/**
* Constructor.
*
@@ -75,6 +88,7 @@ public class PartitionListener implements RaftGroupListener {
*/
public PartitionListener(Storage storage) {
this.storage = storage;
+ this.cursors = new ConcurrentHashMap<>();
}
/** {@inheritDoc} */
@@ -120,6 +134,12 @@ public class PartitionListener implements
RaftGroupListener {
handleGetAndReplaceCommand((CommandClosure<GetAndReplaceCommand>) clo);
else if (command instanceof GetAndUpsertCommand)
handleGetAndUpsertCommand((CommandClosure<GetAndUpsertCommand>) clo);
+ else if (command instanceof ScanInitCommand)
+ handleScanInitCommand((CommandClosure<ScanInitCommand>) clo);
+ else if (command instanceof ScanRetrieveBatchCommand)
+
handleScanRetrieveBatchCommand((CommandClosure<ScanRetrieveBatchCommand>) clo);
+ else if (command instanceof ScanCloseCommand)
+ handleScanCloseCommand((CommandClosure<ScanCloseCommand>) clo);
else
assert false : "Command was not found [cmd=" + command + ']';
});
@@ -417,6 +437,82 @@ public class PartitionListener implements
RaftGroupListener {
clo.result(new SingleRowResponse(response));
}
+ /**
+ * Handler for the {@link ScanInitCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+ ScanInitCommand rangeCmd = clo.command();
+
+ IgniteUuid cursorId = rangeCmd.scanId();
+
+ try {
+ Cursor<DataRow> cursor = storage.scan(key -> true);
+
+ cursors.put(
+ cursorId,
+ new CursorMeta(
+ cursor,
+ rangeCmd.requesterNodeId()
+ )
+ );
+ }
+ catch (StorageException e) {
+ clo.result(e);
+ }
+
+ clo.result(null);
+ }
+
+ /**
+ * Handler for the {@link ScanRetrieveBatchCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+ CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+ if (cursorDesc == null)
+ clo.result(new NoSuchElementException("Corresponding cursor on
server side not found."));
+
+ List<BinaryRow> res = new ArrayList<>();
+
+ try {
+ for (int i = 0; i < clo.command().itemsToRetrieveCount() &&
cursorDesc.cursor().hasNext(); i++)
+ res.add(new
ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+ }
+ catch (Exception e) {
+ clo.result(e);
+ }
+
+ clo.result(new MultiRowsResponse(res));
+ }
+
+ /**
+ * Handler for the {@link ScanCloseCommand}.
+ *
+ * @param clo Command closure.
+ */
+ private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+ CursorMeta cursorDesc = cursors.remove(clo.command().scanId());
+
+ if (cursorDesc == null) {
+ clo.result(null);
+
+ return;
+ }
+
+ try {
+ cursorDesc.cursor().close();
+ }
+ catch (Exception e) {
+ throw new IgniteInternalException(e);
+ }
+
+ clo.result(null);
+ }
+
/** {@inheritDoc} */
@Override public void onSnapshotSave(Path path, Consumer<Throwable>
doneClo) {
storage.snapshot(path).whenComplete((unused, throwable) -> {
@@ -494,4 +590,43 @@ public class PartitionListener implements
RaftGroupListener {
public Storage getStorage() {
return storage;
}
+
+ /**
+ * Cursor meta information: origin node id and type.
+ */
+ private class CursorMeta {
+ /** Cursor. */
+ private final Cursor<DataRow> cursor;
+
+ /** Id of the node that creates cursor. */
+ private final String requesterNodeId;
+
+ /**
+ * The constructor.
+ *
+ * @param cursor Cursor.
+ * @param requesterNodeId Id of the node that creates cursor.
+ */
+ CursorMeta(
+ Cursor<DataRow> cursor,
+ String requesterNodeId
+ ) {
+ this.cursor = cursor;
+ this.requesterNodeId = requesterNodeId;
+ }
+
+ /**
+ * @return Cursor.
+ */
+ public Cursor<DataRow> cursor() {
+ return cursor;
+ }
+
+ /**
+ * @return Id of the node that creates cursor.
+ */
+ public String requesterNodeId() {
+ return requesterNodeId;
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ef4ad90..633f1d2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -23,8 +23,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
@@ -44,16 +51,29 @@ import
org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
+import
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.schema.definition.SchemaManagementMode;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Storage of table rows.
*/
public class InternalTableImpl implements InternalTable {
+ /** Log. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(InternalTableImpl.class);
+
+ /** IgniteUuid generator. */
+ private static final IgniteUuidGenerator UUID_GENERATOR = new
IgniteUuidGenerator(UUID.randomUUID(), 0);
+
//TODO: IGNITE-15443 Use IntMap structure instead of HashMap.
/** Partition map. */
private Map<Integer, RaftGroupService> partitionMap;
@@ -253,6 +273,22 @@ public class InternalTableImpl implements InternalTable {
return collectMultiRowsResponses(futures);
}
+ /** {@inheritDoc} */
+ @Override public @NotNull Publisher<BinaryRow> scan(int p, @Nullable
Transaction tx) {
+ if (p < 0 || p >= partitions) {
+ throw new IllegalArgumentException(
+ LoggerMessageHelper.format(
+ "Invalid partition [partition={}, minValue={},
maxValue={}].",
+ p,
+ 0,
+ partitions - 1
+ )
+ );
+ }
+
+ return new PartitionScanPublisher(partitionMap.get(p));
+ }
+
/**
* Map rows to partitions.
*
@@ -302,4 +338,151 @@ public class InternalTableImpl implements InternalTable {
return list;
});
}
+
+ /** Partition scan publisher. */
+ private class PartitionScanPublisher implements Publisher<BinaryRow> {
+ /** {@link Publisher<BinaryRow>} that relatively notifies about
partition rows. */
+ private final RaftGroupService raftGrpSvc;
+
+ /** */
+ private AtomicBoolean subscribed;
+
+ /**
+ * The constructor.
+ *
+ * @param raftGrpSvc {@link RaftGroupService} to run corresponding
raft commands.
+ */
+ PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+ this.raftGrpSvc = raftGrpSvc;
+ this.subscribed = new AtomicBoolean(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void subscribe(Subscriber<? super BinaryRow>
subscriber) {
+ if (subscriber == null)
+ throw new NullPointerException("Subscriber is null");
+
+ if (!subscribed.compareAndSet(false, true))
+ subscriber.onError(new IllegalStateException("Scan publisher
does not support multiple subscriptions."));
+
+ PartitionScanSubscription subscription = new
PartitionScanSubscription(subscriber);
+
+ subscriber.onSubscribe(subscription);
+ }
+
+ /**
+ * Partition Scan Subscription.
+ */
+ private class PartitionScanSubscription implements Subscription {
+ /** */
+ private final Subscriber<? super BinaryRow> subscriber;
+
+ /** */
+ private final AtomicBoolean canceled;
+
+ /** Scan id to uniquely identify it on server side. */
+ private final IgniteUuid scanId;
+
+ /** Scan initial operation that created server cursor. */
+ private final CompletableFuture<Void> scanInitOp;
+
+ /**
+ * The constructor.
+ * @param subscriber The subscriber.
+ */
+ private PartitionScanSubscription(Subscriber<? super BinaryRow>
subscriber) {
+ this.subscriber = subscriber;
+ this.canceled = new AtomicBoolean(false);
+ this.scanId = UUID_GENERATOR.randomUuid();
+ // TODO: IGNITE-15544 Close partition scans on node left.
+ this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("",
scanId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(long n) {
+ if (n <= 0) {
+ cancel();
+
+ subscriber.onError(new
IllegalArgumentException(LoggerMessageHelper.
+ format("Invalid requested amount of items
[requested={}, minValue=1]", n))
+ );
+ }
+
+ if (canceled.get())
+ return;
+
+ final int internalBatchSize = Integer.MAX_VALUE;
+
+ for (int intBatchCnr = 0; intBatchCnr < (n /
internalBatchSize); intBatchCnr++)
+ scanBatch(internalBatchSize);
+
+ scanBatch((int)(n % internalBatchSize));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancel(true);
+ }
+
+ /**
+ * Cancels given subscription and closes cursor if necessary.
+ *
+ * @param closeCursor If {@code true} closes inner storage scan.
+ */
+ private void cancel(boolean closeCursor) {
+ if (!canceled.compareAndSet(false, true))
+ return;
+
+ if (closeCursor) {
+ scanInitOp.thenRun(() -> raftGrpSvc.run(new
ScanCloseCommand(scanId))).exceptionally(closeT -> {
+ LOG.warn("Unable to close scan.", closeT);
+
+ return null;
+ });
+ }
+ }
+
+ /**
+ * Requests and processes n requested elements where n is an
integer.
+ *
+ * @param n Requested amount of items.
+ */
+ private void scanBatch(int n) {
+ if (canceled.get())
+ return;
+
+ scanInitOp.thenCompose((none) ->
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))
+ .thenAccept(
+ res -> {
+ if (res.getValues() == null) {
+ cancel();
+
+ subscriber.onComplete();
+
+ return;
+ }
+ else
+ res.getValues().forEach(subscriber::onNext);
+
+ if (res.getValues().size() < n) {
+ cancel();
+
+ subscriber.onComplete();
+ }
+ })
+ .exceptionally(
+ t -> {
+ if (t instanceof NoSuchElementException ||
+ t instanceof CompletionException &&
t.getCause() instanceof NoSuchElementException)
+ return null;
+
+ cancel(!scanInitOp.isCompletedExceptionally());
+
+ subscriber.onError(t);
+
+ return null;
+ });
+ }
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index eceb875..e1fc978 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -25,13 +25,17 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
import java.util.stream.Collectors;
+import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.schema.definition.SchemaManagementMode;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Dummy table storage implementation.
@@ -235,6 +239,11 @@ public class DummyInternalTableImpl implements
InternalTable {
return null;
}
+ /** {@inheritDoc} */
+ @Override public @NotNull Publisher<BinaryRow> scan(int p, @Nullable
Transaction tx) {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
+
/**
* @param row Row.
* @return Extracted key.