This is an automated email from the ASF dual-hosted git repository. tledkov pushed a commit to branch ignite-17068 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 319e41272d9c1f0c9176c77865626bd39bc62244 Author: tledkov <[email protected]> AuthorDate: Wed Jun 1 13:25:43 2022 +0300 IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics --- .../apache/ignite/sql/async/AsyncResultSet.java | 23 +++++++ .../internal/sql/api/ItSqlAsynchronousApiTest.java | 75 ++++------------------ .../internal/sql/api/AsyncResultSetImpl.java | 54 +++++++--------- 3 files changed, 58 insertions(+), 94 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java index 58da53fe0..2eacedad4 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java @@ -27,6 +27,25 @@ import org.jetbrains.annotations.Nullable; /** * Asynchronous result set provides methods for query results processing in asynchronous way. * + * <p>Usage example: + * <pre><code> + * private CompletionStage<Void> fetchAllRowsInto( + * AsyncResultSet resultSet, + * List<SqlRow> target + * ) { + * for (var row : resultSet.currentPage()) { + * target.add(row); + * } + * + * if (!resultSet.hasMorePages()) { + * return CompletableFuture.completedFuture(null); + * } + * + * return resultSet.fetchNextPage() + * .thenCompose(res -> fetchAllRowsInto(res, target)); + * } + * </code></pre> + * * @see ResultSet */ public interface AsyncResultSet { @@ -91,6 +110,10 @@ public interface AsyncResultSet { /** * Fetch the next page of results asynchronously. + * The future that is completed with the same {@code AsyncResultSet} object. + * The current page is changed after the future complete. + * The methods {@link #currentPage()}, {@link #currentPageSize()}, {@link #hasMorePages()} + * use current page and return consistent results between complete last page future and call {@code fetchNextPage}. * * @return Operation future. * @throws NoRowSetExpectedException if no row set is expected as a query result. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 3d48a8b66..56ce8b2c4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -29,9 +29,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -238,21 +236,27 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest { Session ses = sql.sessionBuilder().defaultPageSize(1).build(); AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID").get(); + var p0 = ars0.currentPage(); AsyncResultSet ars1 = ars0.fetchNextPage().toCompletableFuture().get(); + var p1 = ars1.currentPage(); AsyncResultSet ars2 = ars1.fetchNextPage().toCompletableFuture().get(); + var p2 = ars2.currentPage(); AsyncResultSet ars3 = ars1.fetchNextPage().toCompletableFuture().get(); + var p3 = ars3.currentPage(); AsyncResultSet ars4 = ars0.fetchNextPage().toCompletableFuture().get(); + var p4 = ars4.currentPage(); - assertSame(ars1, ars4); - assertSame(ars2, ars3); + assertSame(ars0, ars1); + assertSame(ars0, ars2); + assertSame(ars0, ars3); + assertSame(ars0, ars4); - List<SqlRow> res = Stream.of(ars0, ars1, ars2) - .map(AsyncResultSet::currentPage) + List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4) .flatMap(p -> StreamSupport.stream(p.spliterator(), false)) .collect(Collectors.toList()); TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - res.size()); - ars3.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get(); + ars4.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get(); res.addAll(pageProc.result()); @@ -261,63 +265,6 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest { } } - @Test - public void fetchNextPageParallel() throws Exception { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } - - final List<SqlRow> res = new ArrayList<>(); - - IgniteSql sql = CLUSTER_NODES.get(0).sql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); - - AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST").get(); - StreamSupport.stream(ars0.currentPage().spliterator(), false).forEach(res::add); - - AtomicInteger cnt = new AtomicInteger(); - ConcurrentHashMap<Integer, AsyncResultSet> results = new ConcurrentHashMap<>(); - - IgniteTestUtils.runMultiThreaded( - () -> { - AsyncResultSet ars = ars0.fetchNextPage().toCompletableFuture().get(); - - results.put(cnt.getAndIncrement(), ars); - - assertFalse(ars.hasMorePages()); - - return null; - }, - 10, - "test-fetch"); - - AsyncResultSet ars1 = CollectionUtils.first(results.values()); - StreamSupport.stream(ars1.currentPage().spliterator(), false).forEach(res::add); - - // Check that all next page are same. - results.values().forEach(ars -> assertSame(ars1, ars)); - - assertThrowsWithCause( - () -> ars1.fetchNextPage().toCompletableFuture().get(), - IgniteSqlException.class, - "There are no more pages" - ); - - await(ars0.closeAsync()); - - // Check results - Set<Integer> rs = res.stream().map(r -> r.intValue(0)).collect(Collectors.toSet()); - - for (int i = 0; i < ROW_COUNT; ++i) { - assertTrue(rs.remove(i), "Results invalid: " + res); - } - - assertTrue(rs.isEmpty()); - - checkSession(ses); - } - @Test public void errors() { IgniteSql sql = CLUSTER_NODES.get(0).sql(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java index 1a2ee8255..855edd243 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java @@ -50,20 +50,16 @@ import org.jetbrains.annotations.Nullable; */ public class AsyncResultSetImpl implements AsyncResultSet { private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE = - CompletableFuture.failedFuture(new IgniteSqlException("There are no more pages.")); + CompletableFuture.failedFuture(new IgniteSqlException("No more pages.")); private final AsyncSqlCursor<List<Object>> cur; - private final BatchedResult<List<Object>> batchPage; + private volatile BatchedResult<List<Object>> curPage; private final int pageSize; private final Runnable closeRun; - private final Object mux = new Object(); - - private volatile CompletionStage<? extends AsyncResultSet> next; - /** * Constructor. * @@ -71,15 +67,15 @@ public class AsyncResultSetImpl implements AsyncResultSet { */ public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur, BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) { this.cur = cur; - this.batchPage = page; + this.curPage = page; this.pageSize = pageSize; this.closeRun = closeRun; assert cur.queryType() == SqlQueryType.QUERY || ((cur.queryType() == SqlQueryType.DML || cur.queryType() == SqlQueryType.DDL) - && batchPage.items().size() == 1 - && batchPage.items().get(0).size() == 1 - && !batchPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + batchPage + ']'; + && curPage.items().size() == 1 + && curPage.items().get(0).size() == 1 + && !curPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + curPage + ']'; } /** {@inheritDoc} */ @@ -119,9 +115,9 @@ public class AsyncResultSetImpl implements AsyncResultSet { return -1; } - assert batchPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + batchPage; + assert curPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + curPage; - return (long) batchPage.items().get(0).get(0); + return (long) curPage.items().get(0).get(0); } /** {@inheritDoc} */ @@ -131,9 +127,9 @@ public class AsyncResultSetImpl implements AsyncResultSet { return false; } - assert batchPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + batchPage; + assert curPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + curPage; - return (boolean) batchPage.items().get(0).get(0); + return (boolean) curPage.items().get(0).get(0); } /** {@inheritDoc} */ @@ -141,7 +137,9 @@ public class AsyncResultSetImpl implements AsyncResultSet { public Iterable<SqlRow> currentPage() { requireResultSet(); - return () -> new TransformingIterator<>(batchPage.items().iterator(), SqlRowImpl::new); + final Iterator<List<Object>> it0 = curPage.items().iterator(); + + return () -> new TransformingIterator<>(it0, SqlRowImpl::new); } /** {@inheritDoc} */ @@ -149,32 +147,28 @@ public class AsyncResultSetImpl implements AsyncResultSet { public int currentPageSize() { requireResultSet(); - return batchPage.items().size(); + return curPage.items().size(); } /** {@inheritDoc} */ @Override public CompletionStage<? extends AsyncResultSet> fetchNextPage() { - if (next == null) { - synchronized (mux) { - if (next == null) { - if (!hasMorePages()) { - next = HAS_NO_MORE_PAGE_FUTURE; - } else { - next = cur.requestNextAsync(pageSize) - .thenApply(batchRes -> new AsyncResultSetImpl(cur, batchRes, pageSize, closeRun)); - } - } - } - } + if (!hasMorePages()) { + return HAS_NO_MORE_PAGE_FUTURE; + } else { + return cur.requestNextAsync(pageSize) + .thenApply(page -> { + curPage = page; - return next; + return AsyncResultSetImpl.this; + }); + } } /** {@inheritDoc} */ @Override public boolean hasMorePages() { - return batchPage.hasMore(); + return curPage.hasMore(); } /** {@inheritDoc} */
