This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new 74230a4 IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261 74230a4 is described below commit 74230a45e0c5213bf48bcda1be333389702af52d Author: zstan <stanilov...@gmail.com> AuthorDate: Tue Sep 22 13:00:16 2020 +0300 IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261 --- .../query/calcite/exec/rel/RootNode.java | 176 +++++++++------------ .../query/calcite/CalciteQueryProcessorTest.java | 50 ++++++ .../processors/query/calcite/CancelTest.java | 4 +- 3 files changed, 126 insertions(+), 104 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java index f38c613..75a6962 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java @@ -25,7 +25,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.query.IgniteSQLException; @@ -33,39 +32,43 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG; + /** * Client iterator. */ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row>, Iterator<Row> { /** */ - private final ReentrantLock lock; + private final ReentrantLock lock = new ReentrantLock(); /** */ - private final Condition cond; + private final Condition cond = lock.newCondition(); /** */ - private final Deque<Row> buff; + private final Runnable onClose; /** */ - private final Runnable onClose; + private final AtomicReference<Throwable> ex = new AtomicReference<>(); /** */ - private volatile State state = State.RUNNING; + private int waiting; /** */ - private final AtomicReference<Throwable> ex = new AtomicReference<>(); + private Deque<Row> inBuff = new ArrayDeque<>(IN_BUFFER_SIZE); /** */ - private Row row; + private Deque<Row> outBuff = new ArrayDeque<>(IN_BUFFER_SIZE); /** */ - private int waiting; + private volatile boolean closed; /** * @param ctx Execution context. */ public RootNode(ExecutionContext<Row> ctx) { - this(ctx, null); + super(ctx); + + onClose = this::closeInternal; } /** @@ -74,10 +77,6 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, public RootNode(ExecutionContext<Row> ctx, Runnable onClose) { super(ctx); - buff = new ArrayDeque<>(IN_BUFFER_SIZE); - lock = new ReentrantLock(); - cond = lock.newCondition(); - this.onClose = onClose; } @@ -88,14 +87,15 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, /** {@inheritDoc} */ @Override public void close() { + if (closed) + return; + lock.lock(); try { - if (state == State.RUNNING) - state = State.CANCELLED; - else if (state == State.END) - state = State.CLOSED; - else - return; + if (waiting != -1) + ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED)); + + closed = true; // an exception has to be set first to get right check order cond.signalAll(); } @@ -103,84 +103,60 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, lock.unlock(); } - if (onClose == null) - closeInternal(); - else - onClose.run(); + onClose.run(); } /** {@inheritDoc} */ @Override protected boolean isClosed() { - return state == State.CANCELLED || state == State.CLOSED; + return closed; } /** {@inheritDoc} */ @Override public void closeInternal() { - context().execute(() -> { - buff.clear(); - - U.closeQuiet(super::close); - }); + context().execute(() -> sources().forEach(U::closeQuiet)); } /** {@inheritDoc} */ @Override public void push(Row row) { assert waiting > 0; + lock.lock(); try { checkState(); - int req = 0; - - lock.lock(); - try { - if (state != State.RUNNING) - return; - - waiting--; - - buff.offer(row); + waiting--; - if (waiting == 0) - waiting = req = IN_BUFFER_SIZE - buff.size(); + inBuff.offer(row); + if (inBuff.size() == IN_BUFFER_SIZE) cond.signalAll(); - } - finally { - lock.unlock(); - } - - if (req > 0) - source().request(req); } catch (Exception e) { onError(e); } + finally { + lock.unlock(); + } } /** {@inheritDoc} */ @Override public void end() { + assert waiting > 0; + + lock.lock(); try { checkState(); - lock.lock(); - try { - assert waiting > 0 : "waiting=" + waiting; - - waiting = -1; + waiting = -1; - if (state != State.RUNNING) - return; - - cond.signalAll(); - } - finally { - lock.unlock(); - } + cond.signalAll(); } catch (Exception e) { onError(e); } + finally { + lock.unlock(); + } } /** {@inheritDoc} */ @@ -193,12 +169,17 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, /** {@inheritDoc} */ @Override public boolean hasNext() { - if (row != null) + checkException(); + + if (!outBuff.isEmpty()) return true; - else if (state == State.END || state == State.CLOSED) + + if (closed && ex.get() == null) return false; - else - return (row = take()) != null; + + exchangeBuffers(); + + return !outBuff.isEmpty(); } /** {@inheritDoc} */ @@ -206,10 +187,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, if (!hasNext()) throw new NoSuchElementException(); - Row cur0 = row; - row = null; - - return cur0; + return outBuff.remove(); } /** {@inheritDoc} */ @@ -236,61 +214,53 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** */ - private Row take() { + private void exchangeBuffers() { assert !F.isEmpty(sources()) && sources().size() == 1; lock.lock(); try { - while (true) { - checkCancelled(); - assert state == State.RUNNING; + while (ex.get() == null) { + assert outBuff.isEmpty(); - if (!buff.isEmpty()) - return buff.poll(); - else if (waiting == -1) - break; - else if (waiting == 0) { + if (inBuff.size() == IN_BUFFER_SIZE || waiting == -1) { + Deque<Row> tmp = inBuff; + inBuff = outBuff; + outBuff = tmp; + } + + if (waiting == -1) + close(); + else if (inBuff.isEmpty() && waiting == 0) { int req = waiting = IN_BUFFER_SIZE; context().execute(() -> source().request(req)); } + if (!outBuff.isEmpty() || waiting == -1) + break; + cond.await(); } - - state = State.END; } catch (InterruptedException e) { - throw new IgniteInterruptedException(e); + onError(new IgniteInterruptedException(e)); } finally { lock.unlock(); } - assert state == State.END; - - close(); - - return null; + checkException(); } /** */ - private void checkCancelled() { - if (state == State.CANCELLED) { - ex.compareAndSet(null, new IgniteSQLException("The query was cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED)); + private void checkException() { + Throwable e = ex.get(); - throw sqlException(ex.get()); - } - } + if (e == null) + return; - /** */ - private IgniteSQLException sqlException(Throwable e) { - return e instanceof IgniteSQLException - ? (IgniteSQLException)e - : new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e); - } - - /** */ - private enum State { - RUNNING, CANCELLED, END, CLOSED + if (e instanceof IgniteSQLException) + throw (IgniteSQLException)e; + else + throw new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 2886ad9..12448a9 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; @@ -407,6 +408,55 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { assertNull(row); } + + public void testThroughput() { + IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>() + .setCacheMode(CacheMode.REPLICATED) + .setName("developer") + .setSqlSchema("PUBLIC") + .setIndexedTypes(Integer.class, Developer.class) + .setBackups(2) + ); + + int numIterations = 1000; + + int prId = -1; + + for (int i = 0; i < 5000; i++) { + if (i % 1000 == 0) + prId++; + + developer.put(i, new Developer("Name" + i, prId)); + } + + QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class); + + // warmup + for (int i = 0; i < numIterations; i++) { + List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER"); + query.get(0).getAll(); + } + + long start = System.currentTimeMillis(); + for (int i = 0; i < numIterations; i++) { + List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER"); + query.get(0).getAll(); + } + System.out.println("Calcite duration = " + (System.currentTimeMillis() - start)); + + // warmup + for (int i = 0; i < numIterations; i++) { + List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false); + query.get(0).getAll(); + } + + start = System.currentTimeMillis(); + for (int i = 0; i < numIterations; i++) { + List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false); + query.get(0).getAll(); + } + System.out.println("H2 duration = " + (System.currentTimeMillis() - start)); + } /** */ public static class Key { diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java index 0ddac9e..48af61c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ignite.internal.processors.query.calcite; import java.util.Iterator; @@ -43,6 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static java.util.Collections.singletonList; +import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG; /** * Cancel query test. @@ -110,7 +112,7 @@ public class CancelTest extends GridCommonAbstractTest { return null; }, - IgniteSQLException.class, "The query was cancelled while executing" + IgniteSQLException.class, ERR_MSG ); awaitReservationsRelease("TEST");