This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a8ed5f78cf IGNITE-21866 SQL Calcite: Add memory quotas control for 
Cursor.getAll() method - Fixes #11288.
4a8ed5f78cf is described below

commit 4a8ed5f78cf587e7db7bc42a1d2789f3cd37dade
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu May 30 10:44:03 2024 +0300

    IGNITE-21866 SQL Calcite: Add memory quotas control for Cursor.getAll() 
method - Fixes #11288.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../internal/processors/query/calcite/Query.java   |  7 ++--
 .../query/calcite/exec/ExecutionContext.java       |  7 +---
 .../query/calcite/exec/ExecutionServiceImpl.java   | 13 +++++--
 .../exec/tracker/ExecutionNodeMemoryTracker.java   | 10 +++++-
 .../calcite/exec/tracker/ObjectSizeCalculator.java |  9 +++++
 .../calcite/exec/tracker/QueryMemoryTracker.java   |  8 ++++-
 .../query/calcite/util/ListFieldsQueryCursor.java  | 28 +++++++++++++--
 .../processors/query/calcite/QueryChecker.java     | 19 +++++++++-
 .../integration/MemoryQuotasIntegrationTest.java   | 40 ++++++++++++++++++++++
 9 files changed, 122 insertions(+), 19 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
index 815b593cde2..c6915fbe4b7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -34,7 +34,6 @@ import 
org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
-import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
@@ -261,10 +260,8 @@ public class Query<RowT> {
         synchronized (mux) {
             // Query can have multiple fragments, each fragment requests 
memory tracker, but there should be only
             // one memory tracker per query on each node, store it inside 
Query instance.
-            if (memoryTracker == null) {
-                memoryTracker = quota > 0 || globalMemoryTracker != 
NoOpMemoryTracker.INSTANCE ?
-                    new QueryMemoryTracker(globalMemoryTracker, quota) : 
NoOpMemoryTracker.INSTANCE;
-            }
+            if (memoryTracker == null)
+                memoryTracker = QueryMemoryTracker.create(globalMemoryTracker, 
quota);
 
             return memoryTracker;
         }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 39a865ea1ba..8627eb8e37c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -35,8 +35,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFa
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
-import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
-import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpRowTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -344,10 +342,7 @@ public class ExecutionContext<Row> extends 
AbstractQueryContext implements DataC
 
     /** */
     public <R> RowTracker<R> createNodeMemoryTracker(long rowOverhead) {
-        if (qryMemoryTracker == NoOpMemoryTracker.INSTANCE)
-            return NoOpRowTracker.instance();
-        else
-            return new ExecutionNodeMemoryTracker<R>(qryMemoryTracker, 
rowOverhead);
+        return ExecutionNodeMemoryTracker.create(qryMemoryTracker, 
rowOverhead);
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 56ac14c372f..20d6c146f31 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -71,6 +71,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTr
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.PerformanceStatisticsIoTracker;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -611,6 +612,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
             execPlan.target(fragment),
             execPlan.remotes(fragment));
 
+        MemoryTracker qryMemoryTracker = 
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+
         ExecutionContext<Row> ectx = new ExecutionContext<>(
             qry.context(),
             taskExecutor(),
@@ -620,7 +623,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
             mapCtx.topologyVersion(),
             fragmentDesc,
             handler,
-            qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
+            qryMemoryTracker,
             createIoTracker(locNodeId, qry.localQueryId()),
             timeout,
             qryParams);
@@ -752,7 +755,13 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         Iterator<List<?>> it = new 
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
             fieldConverter, rowConverter, onClose);
 
-        return new ListFieldsQueryCursor<>(plan, it, ectx);
+        // Make yet another tracking layer for cursor.getAll(), so tracking 
hierarchy will look like:
+        // Row tracker -> Cursor memory tracker -> Query memory tracker -> 
Global memory tracker.
+        // It's required, since query memory tracker can be closed 
concurrently during getAll() and
+        // tracked data for cursor can be lost without additional tracker.
+        MemoryTracker curMemoryTracker = 
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
+
+        return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker);
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
index 1548606b2e2..81866044e5e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
@@ -39,8 +39,16 @@ public class ExecutionNodeMemoryTracker<Row> implements 
RowTracker<Row> {
     /** */
     private final long rowOverhead;
 
+    /** Factory method. */
+    public static <T> RowTracker<T> create(MemoryTracker qryMemoryTracker, 
long rowOverhead) {
+        if (qryMemoryTracker == NoOpMemoryTracker.INSTANCE)
+            return NoOpRowTracker.instance();
+        else
+            return new ExecutionNodeMemoryTracker<>(qryMemoryTracker, 
rowOverhead);
+    }
+
     /** */
-    public ExecutionNodeMemoryTracker(MemoryTracker qryMemoryTracker, long 
rowOverhead) {
+    ExecutionNodeMemoryTracker(MemoryTracker qryMemoryTracker, long 
rowOverhead) {
         this.qryMemoryTracker = qryMemoryTracker;
         this.rowOverhead = rowOverhead;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
index 00040a85636..515b969c94a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
@@ -125,6 +125,15 @@ public class ObjectSizeCalculator<Row> {
         long intArrOffset = GridUnsafe.arrayBaseOffset(int[].class);
         addSysClsSize(BigInteger.class, (c, bi) -> align( intArrOffset + 
((bi.bitLength() + 31) >> 5) << 2));
         SYS_CLS_SIZE.put(Class.class, (c, v) -> 0);
+        long objArrOffset = GridUnsafe.arrayBaseOffset(Object[].class);
+        addSysClsSize(ArrayList.class, (c, l) -> {
+            long res = align(objArrOffset + l.size() * OBJ_REF_SIZE);
+
+            for (int i = 0; i < l.size(); i++)
+                res += c.sizeOf0(l.get(i), true);
+
+            return res;
+        });
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
index 2b131a0a45e..44d8480366d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
@@ -35,8 +35,14 @@ public class QueryMemoryTracker implements MemoryTracker {
     /** Currently allocated. */
     private final AtomicLong allocated = new AtomicLong();
 
+    /** Factory method. */
+    public static MemoryTracker create(MemoryTracker parent, long quota) {
+        return quota > 0 || parent != NoOpMemoryTracker.INSTANCE ?
+            new QueryMemoryTracker(parent, quota) : NoOpMemoryTracker.INSTANCE;
+    }
+
     /** */
-    public QueryMemoryTracker(MemoryTracker parent, long quota) {
+    QueryMemoryTracker(MemoryTracker parent, long quota) {
         this.parent = parent;
         this.quota = quota;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
index a3885a6f9b9..0344fc72d6e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
@@ -25,6 +25,10 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.ObjectSizeCalculator;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
@@ -41,6 +45,9 @@ public class ListFieldsQueryCursor<Row> implements 
FieldsQueryCursor<List<?>>, Q
     /** */
     private final List<GridQueryFieldMetadata> fieldsMeta;
 
+    /** */
+    private final MemoryTracker qryMemoryTracker;
+
     /** */
     private final boolean isQry;
 
@@ -48,13 +55,19 @@ public class ListFieldsQueryCursor<Row> implements 
FieldsQueryCursor<List<?>>, Q
      * @param plan Query plan.
      * @param it Iterator.
      * @param ectx Row converter.
+     * @param qryMemoryTracker Query memory tracker.
      */
-    public ListFieldsQueryCursor(MultiStepPlan plan, Iterator<List<?>> it, 
ExecutionContext<Row> ectx) {
+    public ListFieldsQueryCursor(
+        MultiStepPlan plan,
+        Iterator<List<?>> it,
+        ExecutionContext<Row> ectx,
+        MemoryTracker qryMemoryTracker
+    ) {
         FieldsMetadata metadata0 = plan.fieldsMetadata();
         assert metadata0 != null;
         fieldsMeta = metadata0.queryFieldsMetadata(ectx.getTypeFactory());
         isQry = plan.type() == QueryPlan.Type.QUERY;
-
+        this.qryMemoryTracker = qryMemoryTracker;
         this.it = it;
     }
 
@@ -67,12 +80,21 @@ public class ListFieldsQueryCursor<Row> implements 
FieldsQueryCursor<List<?>>, Q
     @Override public List<List<?>> getAll() {
         ArrayList<List<?>> res = new ArrayList<>();
 
+        RowTracker<List<?>> rowTracker = 
ExecutionNodeMemoryTracker.create(qryMemoryTracker,
+            ObjectSizeCalculator.OBJ_REF_SIZE);
+
         try {
-            getAll(res::add);
+            getAll(row -> {
+                rowTracker.onRowAdded(row);
+                res.add(row);
+            });
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        finally {
+            qryMemoryTracker.reset();
+        }
 
         return res;
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index cd390f0287b..19f12244783 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -287,6 +287,9 @@ public abstract class QueryChecker {
     /** */
     private boolean ordered;
 
+    /** */
+    private boolean withRowsIterator;
+
     /** */
     private Object[] params = X.EMPTY_OBJECT_ARRAY;
 
@@ -305,6 +308,13 @@ public abstract class QueryChecker {
         return this;
     }
 
+    /** */
+    public QueryChecker withRowsIterator(boolean flag) {
+        withRowsIterator = flag;
+
+        return this;
+    }
+
     /** */
     public QueryChecker withParams(Object... params) {
         this.params = params;
@@ -388,7 +398,14 @@ public abstract class QueryChecker {
             assertThat("Column names don't match", colNames, 
equalTo(expectedColumnNames));
         }
 
-        List<List<?>> res = cur.getAll();
+        List<List<?>> res;
+        if (withRowsIterator) {
+            res = new ArrayList<>();
+            for (Iterator<List<?>> it = cur.iterator(); it.hasNext(); )
+                res.add(it.next());
+        }
+        else
+            res = cur.getAll();
 
         if (expectedResultSize >= 0)
             assertEquals("Unexpected result size", expectedResultSize, 
res.size());
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
index f5c684e59cf..99cd26fd9c2 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
@@ -400,4 +400,44 @@ public class MemoryQuotasIntegrationTest extends 
AbstractBasicIntegrationTest {
             }
         }
     }
+
+    /** */
+    @Test
+    public void testGetAll() {
+        // getAll for 800 rows.
+        assertQuery("SELECT id, b FROM tbl WHERE id < 800")
+            .withRowsIterator(false)
+            .resultSize(800)
+            .check();
+
+        // getAll + collect for 1000 rows.
+        assertThrows("SELECT id, b FROM tbl",
+            IgniteException.class, "Query quota exceeded");
+
+        // Collect for 800 rows.
+        assertQuery("SELECT ARRAY(SELECT b FROM tbl WHERE id < 800)")
+            .resultSize(1)
+            .check();
+
+        // getAll + collect for 400 rows.
+        assertQuery("SELECT ARRAY(SELECT b FROM tbl WHERE id < 400)")
+            .withRowsIterator(false)
+            .resultSize(1)
+            .check();
+
+        // getAll + collect for 800 rows.
+        assertThrows("SELECT ARRAY(SELECT b FROM tbl WHERE id < 800)",
+            IgniteException.class, "Query quota exceeded");
+
+        // getAll + sort for 800 rows (sort node release memory after passing 
rows to iterator).
+        assertQuery("SELECT id, b FROM tbl WHERE id < 800 ORDER BY id")
+            .withRowsIterator(false)
+            .resultSize(800)
+            .check();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected QueryChecker assertQuery(String qry) {
+        return super.assertQuery(qry).withRowsIterator(true);
+    }
 }

Reply via email to