This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3812e3a  IGNITE-15434 Reactive scan for table partitions.
3812e3a is described below

commit 3812e3a6543ba9f1ab62272b5a7967c2246d1949
Author: sanpwc <[email protected]>
AuthorDate: Wed Sep 29 19:36:43 2021 +0300

    IGNITE-15434 Reactive scan for table partitions.
    
    Signed-off-by: Andrey Gura <[email protected]>
---
 .../ignite/client/fakes/FakeInternalTable.java     |   8 +
 .../server/raft/MetaStorageListener.java           |   2 +-
 .../distributed/ITInternalTableScanTest.java       | 529 +++++++++++++++++++++
 .../ignite/internal/table/InternalTable.java       |  12 +-
 .../command/response/MultiRowsResponse.java        |  12 +-
 .../distributed/command/scan/ScanCloseCommand.java |  46 ++
 .../distributed/command/scan/ScanInitCommand.java  |  59 +++
 .../command/scan/ScanRetrieveBatchCommand.java     |  60 +++
 .../table/distributed/raft/PartitionListener.java  | 135 ++++++
 .../distributed/storage/InternalTableImpl.java     | 183 +++++++
 .../table/impl/DummyInternalTableImpl.java         |   9 +
 11 files changed, 1047 insertions(+), 8 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 18e5917..918cc46 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -22,8 +22,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.schema.definition.SchemaManagementMode;
 import org.apache.ignite.tx.Transaction;
@@ -224,4 +227,9 @@ public class FakeInternalTable implements InternalTable {
 
         return CompletableFuture.completedFuture(skipped);
     }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Publisher<BinaryRow> scan(int p, @Nullable 
Transaction tx) {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
 }
diff --git 
a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 98652bb..f52aaff 100644
--- 
a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -281,7 +281,7 @@ public class MetaStorageListener implements 
RaftGroupListener {
             else if (clo.command() instanceof CursorCloseCommand) {
                 CursorCloseCommand cursorCloseCmd = (CursorCloseCommand) 
clo.command();
 
-                CursorMeta cursorDesc = cursors.get(cursorCloseCmd.cursorId());
+                CursorMeta cursorDesc = 
cursors.remove(cursorCloseCmd.cursorId());
 
                 if (cursorDesc == null) {
                     clo.result(null);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
new file mode 100644
index 0000000..59a1113
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link InternalTable#scan(int, org.apache.ignite.tx.Transaction)}
+ */
+@ExtendWith(MockitoExtension.class)
+public class ITInternalTableScanTest {
+    /** */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    private static final RaftMessagesFactory FACTORY = new 
RaftMessagesFactory();
+
+    /** */
+    private static final String TEST_TABLE_NAME = "testTbl";
+
+    /** Mock partition storage. */
+    @Mock
+    private Storage mockStorage;
+
+    /** */
+    private ClusterService network;
+
+    /** */
+    private RaftServer raftSrv;
+
+    /** Internal table to test. */
+    private InternalTable internalTbl;
+
+    /**
+     * Prepare test environment:
+     * <ol>
+     * <li>Start network node.</li>
+     * <li>Start raft server.</li>
+     * <li>Prepare partitioned raft group.</li>
+     * <li>Prepare partitioned raft group service.</li>
+     * <li>Prepare internal table as a test object.</li>
+     * </ol>
+     *
+     * @throws Exception If any.
+     */
+    @BeforeEach
+    public void setUp(TestInfo testInfo) throws Exception {
+        NetworkAddress nodeNetworkAddress = new NetworkAddress("localhost", 
20_000);
+
+        network = ClusterServiceTestUtils.clusterService(
+            testInfo,
+            20_000,
+            new StaticNodeFinder(List.of(nodeNetworkAddress)),
+            SERIALIZATION_REGISTRY,
+            NETWORK_FACTORY
+        );
+
+        network.start();
+
+        raftSrv = new RaftServerImpl(network, FACTORY);
+
+        raftSrv.start();
+
+        String grpName = "test_part_grp";
+
+        List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
+
+        mockStorage = mock(Storage.class);
+
+        raftSrv.startRaftGroup(
+            grpName,
+            new PartitionListener(mockStorage),
+            conf
+        );
+
+        RaftGroupService raftGrpSvc = RaftGroupServiceImpl.start(
+            grpName,
+            network,
+            FACTORY,
+            10_000,
+            conf,
+            true,
+            200
+        ).get(3, TimeUnit.SECONDS);
+
+        internalTbl = new InternalTableImpl(
+            TEST_TABLE_NAME,
+            new IgniteUuidGenerator(UUID.randomUUID(), 0).randomUuid(),
+            Map.of(0, raftGrpSvc),
+            1
+        );
+    }
+
+    /**
+     * Cleanup previously started network and raft server.
+     *
+     * @throws Exception If failed to stop component.
+     */
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (raftSrv != null)
+            raftSrv.beforeNodeStop();
+
+        if (network != null)
+            network.beforeNodeStop();
+
+        if (raftSrv != null)
+            raftSrv.stop();
+
+        if (network != null)
+            network.stop();
+    }
+
+    /**
+     * Checks whether publisher provides all existing data and then completes 
if requested by one row at a time.
+     */
+    @Test
+    public void testOneRowScan() throws Exception {
+        requestNTest(
+            List.of(
+                prepareDataRow("key1", "val1"),
+                prepareDataRow("key2", "val2")
+            ),
+            1);
+    }
+
+    /**
+     * Checks whether publisher provides all existing data and then completes 
if requested by multiple rows at a time.
+     */
+    @Test
+    public void testMultipleRowScan() throws Exception {
+        requestNTest(
+            List.of(
+                prepareDataRow("key1", "val1"),
+                prepareDataRow("key2", "val2"),
+                prepareDataRow("key3", "val3"),
+                prepareDataRow("key4", "val4"),
+                prepareDataRow("key5", "val5")
+            ),
+            2);
+    }
+
+    /**
+     * Checks whether {@link IllegalArgumentException} is thrown and inner 
storage cursor is closes in case of invalid
+     * requested amount of items.
+     *
+     * @throws Exception If any.
+     */
+    @Test()
+    public void testInvalidRequestedAmountScan() throws Exception {
+        AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+        when(mockStorage.scan(any())).thenAnswer(invocation -> {
+            var cursor = mock(Cursor.class);
+
+            doAnswer(
+                invocationClose -> {
+                    cursorClosed.set(true);
+                    return null;
+                }
+            ).when(cursor).close();
+
+            when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+                throw new StorageException("test");
+            });
+
+            return cursor;
+        });
+
+        for (long n : new long[] {-1, 0}) {
+            AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+            cursorClosed.set(false);
+
+            internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+                @Override public void onSubscribe(Subscription subscription) {
+                    subscription.request(n);
+                }
+
+                @Override public void onNext(BinaryRow item) {
+                    fail("Should never get here.");
+                }
+
+                @Override public void onError(Throwable throwable) {
+                    gotException.set(throwable);
+                }
+
+                @Override public void onComplete() {
+                    fail("Should never get here.");
+                }
+            });
+
+            assertTrue(waitForCondition(() -> gotException.get() != null, 
1_000));
+
+            assertTrue(waitForCondition(cursorClosed::get, 1_000));
+
+            assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    throw gotException.get();
+                }
+            );
+        }
+    }
+
+    /**
+     * Checks that exception from storage cursors has next properly propagates 
to subscriber.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581";)
+    @Test
+    public void testExceptionRowScanCursorHasNext() throws Exception {
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+        AtomicBoolean cursorClosed = new AtomicBoolean(false);
+
+        when(mockStorage.scan(any())).thenAnswer(invocation -> {
+            var cursor = mock(Cursor.class);
+
+            when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+                throw new StorageException("test");
+            });
+
+            doAnswer(
+                invocationClose -> {
+                    cursorClosed.set(true);
+                    return null;
+                }
+            ).when(cursor).close();
+
+            return cursor;
+        });
+
+        internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+            @Override public void onSubscribe(Subscription subscription) {
+                subscription.request(1);
+            }
+
+            @Override public void onNext(BinaryRow item) {
+                fail("Should never get here.");
+            }
+
+            @Override public void onError(Throwable throwable) {
+                gotException.set(throwable);
+            }
+
+            @Override public void onComplete() {
+                fail("Should never get here.");
+            }
+        });
+
+        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+        assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
+
+        assertTrue(waitForCondition(cursorClosed::get, 1_000));
+    }
+
+    /**
+     * Checks that exception from storage cursor creation properly propagates 
to subscriber.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15581";)
+    @Test
+    public void testExceptionRowScan() throws Exception {
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+        when(mockStorage.scan(any())).thenThrow(new StorageException("Some 
storage exception"));
+
+        internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+
+            @Override public void onSubscribe(Subscription subscription) {
+                subscription.request(1);
+            }
+
+            @Override public void onNext(BinaryRow item) {
+                fail("Should never get here.");
+            }
+
+            @Override public void onError(Throwable throwable) {
+                gotException.set(throwable);
+            }
+
+            @Override public void onComplete() {
+                fail("Should never get here.");
+            }
+        });
+
+        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+        assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
+    }
+
+
+    /**
+     * Checks that {@link IllegalArgumentException} is thrown in case of 
invalid partition.
+     */
+    @Test()
+    public void testInvalidPartitionParameterScan() {
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> internalTbl.scan(-1, null)
+        );
+
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> internalTbl.scan(1, null)
+        );
+    }
+
+    /**
+     * Checks that in case of second subscription {@link 
IllegalStateException} will be fires to onError.
+     *
+     * @throws Exception If any.
+     */
+    @Test
+    public void testSecondSubscriptionFiresIllegalStateException() throws 
Exception {
+        Flow.Publisher<BinaryRow> scan = internalTbl.scan(0, null);
+
+        scan.subscribe(new Subscriber<>() {
+            @Override public void onSubscribe(Subscription subscription) {
+
+            }
+
+            @Override public void onNext(BinaryRow item) {
+
+            }
+
+            @Override public void onError(Throwable throwable) {
+
+            }
+
+            @Override public void onComplete() {
+
+            }
+        });
+
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+        scan.subscribe(new Subscriber<>() {
+            @Override public void onSubscribe(Subscription subscription) {
+
+            }
+
+            @Override public void onNext(BinaryRow item) {
+
+            }
+
+            @Override public void onError(Throwable throwable) {
+                gotException.set(throwable);
+            }
+
+            @Override public void onComplete() {
+
+            }
+        });
+
+        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+
+        assertEquals(gotException.get().getClass(), 
IllegalStateException.class);
+    }
+
+    /**
+     * Checks that {@link NullPointerException} is thrown in case of null 
subscription.
+     */
+    @Test
+    public void testNullPointerExceptionIsThrownInCaseOfNullSubscription() {
+        assertThrows(
+            NullPointerException.class,
+            () -> internalTbl.scan(0, null).subscribe(null)
+        );
+    }
+
+    /**
+     * Helper method to convert key and value to {@link DataRow}.
+     *
+     * @param entryKey Key.
+     * @param entryVal Value
+     * @return {@link DataRow} based on given key and value.
+     * @throws java.io.IOException If failed to close output stream that was 
used to convertation.
+     */
+    private static @NotNull DataRow prepareDataRow(@NotNull String entryKey,
+        @NotNull String entryVal) throws IOException {
+        byte[] keyBytes = ByteUtils.toBytes(entryKey);
+
+        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) 
{
+            outputStream.write(keyBytes);
+            outputStream.write(ByteUtils.toBytes(entryVal));
+
+            return new SimpleDataRow(keyBytes, outputStream.toByteArray());
+        }
+    }
+
+    /**
+     * Checks whether publisher provides all existing data and then completes 
if requested by reqAmount rows at a time.
+     *
+     * @param submittedItems Items to be pushed by ublisher.
+     * @param reqAmount Amount of rows to request at a time.
+     * @throws Exception If Any.
+     */
+    private void requestNTest(List<DataRow> submittedItems, int reqAmount) 
throws Exception {
+        AtomicInteger cursorTouchCnt = new AtomicInteger(0);
+
+        List<BinaryRow> retrievedItems = Collections.synchronizedList(new 
ArrayList<>());
+
+        when(mockStorage.scan(any())).thenAnswer(invocation -> {
+            var cursor = mock(Cursor.class);
+
+            when(cursor.hasNext()).thenAnswer(hnInvocation -> 
cursorTouchCnt.get() < submittedItems.size());
+
+            when(cursor.next()).thenAnswer(nInvocation -> 
submittedItems.get(cursorTouchCnt.getAndIncrement()));
+
+            return cursor;
+        });
+
+        AtomicBoolean noMoreData = new AtomicBoolean(false);
+
+        internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+            private Subscription subscription;
+
+            @Override public void onSubscribe(Subscription subscription) {
+                this.subscription = subscription;
+
+                subscription.request(reqAmount);
+            }
+
+            @Override public void onNext(BinaryRow item) {
+                retrievedItems.add(item);
+
+                if (retrievedItems.size() % reqAmount == 0)
+                    subscription.request(reqAmount);
+            }
+
+            @Override public void onError(Throwable throwable) {
+                fail("onError call is not expected.");
+            }
+
+            @Override public void onComplete() {
+                noMoreData.set(true);
+            }
+        });
+
+        assertTrue(waitForCondition(() -> retrievedItems.size() == 
submittedItems.size(), 2_000));
+
+        List<byte[]> expItems = 
submittedItems.stream().map(DataRow::valueBytes).collect(Collectors.toList());
+        List<byte[]> gotItems = 
retrievedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList());
+
+        for (int i = 0; i < expItems.size(); i++)
+            assertTrue(Arrays.equals(expItems.get(i), gotItems.get(i)));
+
+        assertTrue(noMoreData.get(), "More data is not expected.");
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index f6faafb..6466769 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.schema.definition.SchemaManagementMode;
@@ -195,5 +196,14 @@ public interface InternalTable {
     CompletableFuture<Collection<BinaryRow>> 
deleteAllExact(Collection<BinaryRow> rows,
         @Nullable Transaction tx);
 
-    //TODO: IGNTIE-14488. Add invoke() methods.
+    /**
+     * Scans given partition, providing {@link Publisher<BinaryRow>} that 
reactively notifies about partition rows.
+     *
+     * @param p The partition.
+     * @param tx The transaction.
+     * @return {@link Publisher<BinaryRow>} that reactively notifies about 
partition rows.
+     */
+    @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
+
+    //TODO: IGNITE-14488. Add invoke() methods.
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
index 7c9d969..2cd6097 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.table.distributed.command.response;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.distributed.command.CommandUtils;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
@@ -36,7 +36,7 @@ import 
org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
  */
 public class MultiRowsResponse implements Serializable {
     /** Binary rows. */
-    private transient Collection<BinaryRow> rows;
+    private transient List<BinaryRow> rows;
 
     /*
      * Row bytes.
@@ -50,7 +50,7 @@ public class MultiRowsResponse implements Serializable {
      *
      * @param rows Collection of binary rows.
      */
-    public MultiRowsResponse(Collection<BinaryRow> rows) {
+    public MultiRowsResponse(List<BinaryRow> rows) {
         this.rows = rows;
 
         CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
@@ -59,9 +59,9 @@ public class MultiRowsResponse implements Serializable {
     /**
      * @return Binary rows.
      */
-    public Collection<BinaryRow> getValues() {
+    public List<BinaryRow> getValues() {
         if (rows == null && rowsBytes != null) {
-            rows = new HashSet<>();
+            rows = new ArrayList<>();
 
             CommandUtils.readRows(rowsBytes, rows::add);
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
new file mode 100644
index 0000000..db3cd58
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.scan;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Scan close command for PartitionListener that closes scan with given id.
+ */
+public class ScanCloseCommand implements WriteCommand {
+    /** Id of scan that is associated with the current command. */
+    @NotNull private final IgniteUuid scanId;
+
+    /**
+     * The Constructor.
+     *
+     * @param scanId Id of scan that is associated with the current command.
+     */
+    public ScanCloseCommand(@NotNull IgniteUuid scanId) {
+        this.scanId = scanId;
+    }
+
+    /**
+     * @return Id of scan that is associated with the current command.
+     */
+    public @NotNull IgniteUuid scanId() {
+        return scanId;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
new file mode 100644
index 0000000..9cc397c
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.scan;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Scan init command for PartitionListener that prepares server-side scan for 
further iteration over it.
+ */
+public class ScanInitCommand implements WriteCommand {
+    /** Id of the node that requests scan. */
+    @NotNull private final String requesterNodeId;
+
+    /** Id of scan that is associated with the current command. */
+    @NotNull private final IgniteUuid scanId;
+
+    /**
+     * @param requesterNodeId Id of the node that requests scan.
+     * @param scanId Id of scan that is associated with the current command.
+     */
+    public ScanInitCommand(
+        @NotNull String requesterNodeId,
+        @NotNull IgniteUuid scanId
+    ) {
+        this.requesterNodeId = requesterNodeId;
+        this.scanId = scanId;
+    }
+
+    /**
+     * @return Id of the node that requests scan.
+     */
+    public @NotNull String requesterNodeId() {
+        return requesterNodeId;
+    }
+
+    /**
+     * @return Id of scan that is associated with the current command.
+     */
+    @NotNull public IgniteUuid scanId() {
+        return scanId;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanRetrieveBatchCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanRetrieveBatchCommand.java
new file mode 100644
index 0000000..7cfd882
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanRetrieveBatchCommand.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.scan;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Scan retrieve batch command for PartitionListener that retrieves batch of 
data from previously prepared server scan,
+ * see {@link ScanInitCommand} for more details.
+ */
+public class ScanRetrieveBatchCommand implements WriteCommand {
+    /** Amount of items to retrieve. */
+    private final int itemsToRetrieveCnt;
+
+    /** Id of scan that is associated with the current command. */
+    @NotNull private final IgniteUuid scanId;
+
+    /**
+     * @param itemsToRetrieveCnt Amount of items to retrieve.
+     * @param scanId Id of scan that is associated with the current command.
+     */
+    public ScanRetrieveBatchCommand(
+        int itemsToRetrieveCnt,
+        @NotNull IgniteUuid scanId
+    ) {
+        this.itemsToRetrieveCnt = itemsToRetrieveCnt;
+        this.scanId = scanId;
+    }
+
+    /**
+     * @return Amount of items to retrieve.
+     */
+    public int itemsToRetrieveCount() {
+        return itemsToRetrieveCnt;
+    }
+
+    /**
+     * @return Id of scan that is associated with the current command.
+     */
+    @NotNull public IgniteUuid scanId() {
+        return scanId;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index dc816a4..0301fb4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.table.distributed.raft;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -29,6 +33,7 @@ import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.SearchRow;
 import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
 import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
 import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
@@ -52,7 +57,12 @@ import 
org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
 import 
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import 
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.ReadCommand;
 import org.apache.ignite.raft.client.WriteCommand;
@@ -68,6 +78,9 @@ public class PartitionListener implements RaftGroupListener {
     /** Partition storage. */
     private final Storage storage;
 
+    /** Cursors map. */
+    private final Map<IgniteUuid, CursorMeta> cursors;
+
     /**
      * Constructor.
      *
@@ -75,6 +88,7 @@ public class PartitionListener implements RaftGroupListener {
      */
     public PartitionListener(Storage storage) {
         this.storage = storage;
+        this.cursors = new ConcurrentHashMap<>();
     }
 
     /** {@inheritDoc} */
@@ -120,6 +134,12 @@ public class PartitionListener implements 
RaftGroupListener {
                 
handleGetAndReplaceCommand((CommandClosure<GetAndReplaceCommand>) clo);
             else if (command instanceof GetAndUpsertCommand)
                 
handleGetAndUpsertCommand((CommandClosure<GetAndUpsertCommand>) clo);
+            else if (command instanceof ScanInitCommand)
+                handleScanInitCommand((CommandClosure<ScanInitCommand>) clo);
+            else if (command instanceof ScanRetrieveBatchCommand)
+                
handleScanRetrieveBatchCommand((CommandClosure<ScanRetrieveBatchCommand>) clo);
+            else if (command instanceof ScanCloseCommand)
+                handleScanCloseCommand((CommandClosure<ScanCloseCommand>) clo);
             else
                 assert false : "Command was not found [cmd=" + command + ']';
         });
@@ -417,6 +437,82 @@ public class PartitionListener implements 
RaftGroupListener {
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        try {
+            Cursor<DataRow> cursor = storage.scan(key -> true);
+
+            cursors.put(
+                cursorId,
+                new CursorMeta(
+                    cursor,
+                    rangeCmd.requesterNodeId()
+                )
+            );
+        }
+        catch (StorageException e) {
+            clo.result(e);
+        }
+
+        clo.result(null);
+    }
+
+    /**
+     * Handler for the {@link ScanRetrieveBatchCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void 
handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null)
+            clo.result(new NoSuchElementException("Corresponding cursor on 
server side not found."));
+
+        List<BinaryRow> res = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < clo.command().itemsToRetrieveCount() && 
cursorDesc.cursor().hasNext(); i++)
+                res.add(new 
ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+        }
+        catch (Exception e) {
+            clo.result(e);
+        }
+
+        clo.result(new MultiRowsResponse(res));
+    }
+
+    /**
+     * Handler for the {@link ScanCloseCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+        CursorMeta cursorDesc = cursors.remove(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(null);
+
+            return;
+        }
+
+        try {
+            cursorDesc.cursor().close();
+        }
+        catch (Exception e) {
+            throw new IgniteInternalException(e);
+        }
+
+        clo.result(null);
+    }
+
     /** {@inheritDoc} */
     @Override public void onSnapshotSave(Path path, Consumer<Throwable> 
doneClo) {
         storage.snapshot(path).whenComplete((unused, throwable) -> {
@@ -494,4 +590,43 @@ public class PartitionListener implements 
RaftGroupListener {
     public Storage getStorage() {
         return storage;
     }
+
+    /**
+     * Cursor meta information: origin node id and type.
+     */
+    private class CursorMeta {
+        /** Cursor. */
+        private final Cursor<DataRow> cursor;
+
+        /** Id of the node that creates cursor. */
+        private final String requesterNodeId;
+
+        /**
+         * The constructor.
+         *
+         * @param cursor Cursor.
+         * @param requesterNodeId Id of the node that creates cursor.
+         */
+        CursorMeta(
+            Cursor<DataRow> cursor,
+            String requesterNodeId
+        ) {
+            this.cursor = cursor;
+            this.requesterNodeId = requesterNodeId;
+        }
+
+        /**
+         * @return Cursor.
+         */
+        public Cursor<DataRow> cursor() {
+            return cursor;
+        }
+
+        /**
+         * @return Id of the node that creates cursor.
+         */
+        public String requesterNodeId() {
+            return requesterNodeId;
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ef4ad90..633f1d2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -23,8 +23,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
@@ -44,16 +51,29 @@ import 
org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
 import 
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import 
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
+import 
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.schema.definition.SchemaManagementMode;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Storage of table rows.
  */
 public class InternalTableImpl implements InternalTable {
+    /** Log. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(InternalTableImpl.class);
+
+    /** IgniteUuid generator. */
+    private static final IgniteUuidGenerator UUID_GENERATOR = new 
IgniteUuidGenerator(UUID.randomUUID(), 0);
+
     //TODO: IGNITE-15443 Use IntMap structure instead of HashMap.
     /** Partition map. */
     private Map<Integer, RaftGroupService> partitionMap;
@@ -253,6 +273,22 @@ public class InternalTableImpl implements InternalTable {
         return collectMultiRowsResponses(futures);
     }
 
+    /** {@inheritDoc} */
+    @Override public @NotNull Publisher<BinaryRow> scan(int p, @Nullable 
Transaction tx) {
+        if (p < 0 || p >= partitions) {
+            throw new IllegalArgumentException(
+                LoggerMessageHelper.format(
+                    "Invalid partition [partition={}, minValue={}, 
maxValue={}].",
+                    p,
+                    0,
+                    partitions - 1
+                )
+            );
+        }
+
+        return new PartitionScanPublisher(partitionMap.get(p));
+    }
+
     /**
      * Map rows to partitions.
      *
@@ -302,4 +338,151 @@ public class InternalTableImpl implements InternalTable {
                     return list;
                 });
     }
+
+    /** Partition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** {@link Publisher<BinaryRow>} that relatively notifies about 
partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /** */
+        private AtomicBoolean subscribed;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link RaftGroupService} to run corresponding 
raft commands.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.raftGrpSvc = raftGrpSvc;
+            this.subscribed = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> 
subscriber) {
+            if (subscriber == null)
+                throw new NullPointerException("Subscriber is null");
+
+            if (!subscribed.compareAndSet(false, true))
+                subscriber.onError(new IllegalStateException("Scan publisher 
does not support multiple subscriptions."));
+
+            PartitionScanSubscription subscription = new 
PartitionScanSubscription(subscriber);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean canceled;
+
+            /** Scan id to uniquely identify it on server side. */
+            private final IgniteUuid scanId;
+
+            /** Scan initial operation that created server cursor. */
+            private final CompletableFuture<Void> scanInitOp;
+
+            /**
+             * The constructor.
+             * @param subscriber The subscriber.
+             */
+            private PartitionScanSubscription(Subscriber<? super BinaryRow> 
subscriber) {
+                this.subscriber = subscriber;
+                this.canceled = new AtomicBoolean(false);
+                this.scanId = UUID_GENERATOR.randomUuid();
+                // TODO: IGNITE-15544 Close partition scans on node left.
+                this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("", 
scanId));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void request(long n) {
+                if (n <= 0) {
+                    cancel();
+
+                    subscriber.onError(new 
IllegalArgumentException(LoggerMessageHelper.
+                        format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
+                    );
+                }
+
+                if (canceled.get())
+                    return;
+
+                final int internalBatchSize = Integer.MAX_VALUE;
+
+                for (int intBatchCnr = 0; intBatchCnr < (n / 
internalBatchSize); intBatchCnr++)
+                    scanBatch(internalBatchSize);
+
+                scanBatch((int)(n % internalBatchSize));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                cancel(true);
+            }
+
+            /**
+             * Cancels given subscription and closes cursor if necessary.
+             *
+             * @param closeCursor If {@code true} closes inner storage scan.
+             */
+            private void cancel(boolean closeCursor) {
+                if (!canceled.compareAndSet(false, true))
+                    return;
+
+                if (closeCursor) {
+                    scanInitOp.thenRun(() -> raftGrpSvc.run(new 
ScanCloseCommand(scanId))).exceptionally(closeT -> {
+                        LOG.warn("Unable to close scan.", closeT);
+
+                        return null;
+                    });
+                }
+            }
+
+            /**
+             * Requests and processes n requested elements where n is an 
integer.
+             *
+             * @param n Requested amount of items.
+             */
+            private void scanBatch(int n) {
+                if (canceled.get())
+                    return;
+
+                scanInitOp.thenCompose((none) -> 
raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))
+                    .thenAccept(
+                        res -> {
+                            if (res.getValues() == null) {
+                                cancel();
+
+                                subscriber.onComplete();
+
+                                return;
+                            }
+                            else
+                                res.getValues().forEach(subscriber::onNext);
+
+                            if (res.getValues().size() < n) {
+                                cancel();
+
+                                subscriber.onComplete();
+                            }
+                        })
+                    .exceptionally(
+                        t -> {
+                            if (t instanceof NoSuchElementException ||
+                                t instanceof CompletionException && 
t.getCause() instanceof NoSuchElementException)
+                                return null;
+
+                            cancel(!scanInitOp.isCompletedExceptionally());
+
+                            subscriber.onError(t);
+
+                            return null;
+                        });
+            }
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index eceb875..e1fc978 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -25,13 +25,17 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
 import java.util.stream.Collectors;
+import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.schema.definition.SchemaManagementMode;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Dummy table storage implementation.
@@ -235,6 +239,11 @@ public class DummyInternalTableImpl implements 
InternalTable {
         return null;
     }
 
+    /** {@inheritDoc} */
+    @Override public @NotNull Publisher<BinaryRow> scan(int p, @Nullable 
Transaction tx) {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
+
     /**
      * @param row Row.
      * @return Extracted key.

Reply via email to