This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4a8ed5f78cf IGNITE-21866 SQL Calcite: Add memory quotas control for
Cursor.getAll() method - Fixes #11288.
4a8ed5f78cf is described below
commit 4a8ed5f78cf587e7db7bc42a1d2789f3cd37dade
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu May 30 10:44:03 2024 +0300
IGNITE-21866 SQL Calcite: Add memory quotas control for Cursor.getAll()
method - Fixes #11288.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../internal/processors/query/calcite/Query.java | 7 ++--
.../query/calcite/exec/ExecutionContext.java | 7 +---
.../query/calcite/exec/ExecutionServiceImpl.java | 13 +++++--
.../exec/tracker/ExecutionNodeMemoryTracker.java | 10 +++++-
.../calcite/exec/tracker/ObjectSizeCalculator.java | 9 +++++
.../calcite/exec/tracker/QueryMemoryTracker.java | 8 ++++-
.../query/calcite/util/ListFieldsQueryCursor.java | 28 +++++++++++++--
.../processors/query/calcite/QueryChecker.java | 19 +++++++++-
.../integration/MemoryQuotasIntegrationTest.java | 40 ++++++++++++++++++++++
9 files changed, 122 insertions(+), 19 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
index 815b593cde2..c6915fbe4b7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -34,7 +34,6 @@ import
org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
-import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -261,10 +260,8 @@ public class Query<RowT> {
synchronized (mux) {
// Query can have multiple fragments, each fragment requests
memory tracker, but there should be only
// one memory tracker per query on each node, store it inside
Query instance.
- if (memoryTracker == null) {
- memoryTracker = quota > 0 || globalMemoryTracker !=
NoOpMemoryTracker.INSTANCE ?
- new QueryMemoryTracker(globalMemoryTracker, quota) :
NoOpMemoryTracker.INSTANCE;
- }
+ if (memoryTracker == null)
+ memoryTracker = QueryMemoryTracker.create(globalMemoryTracker,
quota);
return memoryTracker;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 39a865ea1ba..8627eb8e37c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -35,8 +35,6 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFa
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
-import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
-import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpRowTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -344,10 +342,7 @@ public class ExecutionContext<Row> extends
AbstractQueryContext implements DataC
/** */
public <R> RowTracker<R> createNodeMemoryTracker(long rowOverhead) {
- if (qryMemoryTracker == NoOpMemoryTracker.INSTANCE)
- return NoOpRowTracker.instance();
- else
- return new ExecutionNodeMemoryTracker<R>(qryMemoryTracker,
rowOverhead);
+ return ExecutionNodeMemoryTracker.create(qryMemoryTracker,
rowOverhead);
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 56ac14c372f..20d6c146f31 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -71,6 +71,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTr
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.PerformanceStatisticsIoTracker;
+import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -611,6 +612,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
execPlan.target(fragment),
execPlan.remotes(fragment));
+ MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+
ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
taskExecutor(),
@@ -620,7 +623,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
mapCtx.topologyVersion(),
fragmentDesc,
handler,
- qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
+ qryMemoryTracker,
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
qryParams);
@@ -752,7 +755,13 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
Iterator<List<?>> it = new
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
fieldConverter, rowConverter, onClose);
- return new ListFieldsQueryCursor<>(plan, it, ectx);
+ // Make yet another tracking layer for cursor.getAll(), so tracking
hierarchy will look like:
+ // Row tracker -> Cursor memory tracker -> Query memory tracker ->
Global memory tracker.
+ // It's required, since query memory tracker can be closed
concurrently during getAll() and
+ // tracked data for cursor can be lost without additional tracker.
+ MemoryTracker curMemoryTracker =
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
+
+ return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker);
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
index 1548606b2e2..81866044e5e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java
@@ -39,8 +39,16 @@ public class ExecutionNodeMemoryTracker<Row> implements
RowTracker<Row> {
/** */
private final long rowOverhead;
+ /** Factory method. */
+ public static <T> RowTracker<T> create(MemoryTracker qryMemoryTracker,
long rowOverhead) {
+ if (qryMemoryTracker == NoOpMemoryTracker.INSTANCE)
+ return NoOpRowTracker.instance();
+ else
+ return new ExecutionNodeMemoryTracker<>(qryMemoryTracker,
rowOverhead);
+ }
+
/** */
- public ExecutionNodeMemoryTracker(MemoryTracker qryMemoryTracker, long
rowOverhead) {
+ ExecutionNodeMemoryTracker(MemoryTracker qryMemoryTracker, long
rowOverhead) {
this.qryMemoryTracker = qryMemoryTracker;
this.rowOverhead = rowOverhead;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
index 00040a85636..515b969c94a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
@@ -125,6 +125,15 @@ public class ObjectSizeCalculator<Row> {
long intArrOffset = GridUnsafe.arrayBaseOffset(int[].class);
addSysClsSize(BigInteger.class, (c, bi) -> align( intArrOffset +
((bi.bitLength() + 31) >> 5) << 2));
SYS_CLS_SIZE.put(Class.class, (c, v) -> 0);
+ long objArrOffset = GridUnsafe.arrayBaseOffset(Object[].class);
+ addSysClsSize(ArrayList.class, (c, l) -> {
+ long res = align(objArrOffset + l.size() * OBJ_REF_SIZE);
+
+ for (int i = 0; i < l.size(); i++)
+ res += c.sizeOf0(l.get(i), true);
+
+ return res;
+ });
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
index 2b131a0a45e..44d8480366d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java
@@ -35,8 +35,14 @@ public class QueryMemoryTracker implements MemoryTracker {
/** Currently allocated. */
private final AtomicLong allocated = new AtomicLong();
+ /** Factory method. */
+ public static MemoryTracker create(MemoryTracker parent, long quota) {
+ return quota > 0 || parent != NoOpMemoryTracker.INSTANCE ?
+ new QueryMemoryTracker(parent, quota) : NoOpMemoryTracker.INSTANCE;
+ }
+
/** */
- public QueryMemoryTracker(MemoryTracker parent, long quota) {
+ QueryMemoryTracker(MemoryTracker parent, long quota) {
this.parent = parent;
this.quota = quota;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
index a3885a6f9b9..0344fc72d6e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
@@ -25,6 +25,10 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
+import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
+import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.ObjectSizeCalculator;
+import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
import
org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
import
org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
@@ -41,6 +45,9 @@ public class ListFieldsQueryCursor<Row> implements
FieldsQueryCursor<List<?>>, Q
/** */
private final List<GridQueryFieldMetadata> fieldsMeta;
+ /** */
+ private final MemoryTracker qryMemoryTracker;
+
/** */
private final boolean isQry;
@@ -48,13 +55,19 @@ public class ListFieldsQueryCursor<Row> implements
FieldsQueryCursor<List<?>>, Q
* @param plan Query plan.
* @param it Iterator.
* @param ectx Row converter.
+ * @param qryMemoryTracker Query memory tracker.
*/
- public ListFieldsQueryCursor(MultiStepPlan plan, Iterator<List<?>> it,
ExecutionContext<Row> ectx) {
+ public ListFieldsQueryCursor(
+ MultiStepPlan plan,
+ Iterator<List<?>> it,
+ ExecutionContext<Row> ectx,
+ MemoryTracker qryMemoryTracker
+ ) {
FieldsMetadata metadata0 = plan.fieldsMetadata();
assert metadata0 != null;
fieldsMeta = metadata0.queryFieldsMetadata(ectx.getTypeFactory());
isQry = plan.type() == QueryPlan.Type.QUERY;
-
+ this.qryMemoryTracker = qryMemoryTracker;
this.it = it;
}
@@ -67,12 +80,21 @@ public class ListFieldsQueryCursor<Row> implements
FieldsQueryCursor<List<?>>, Q
@Override public List<List<?>> getAll() {
ArrayList<List<?>> res = new ArrayList<>();
+ RowTracker<List<?>> rowTracker =
ExecutionNodeMemoryTracker.create(qryMemoryTracker,
+ ObjectSizeCalculator.OBJ_REF_SIZE);
+
try {
- getAll(res::add);
+ getAll(row -> {
+ rowTracker.onRowAdded(row);
+ res.add(row);
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ finally {
+ qryMemoryTracker.reset();
+ }
return res;
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index cd390f0287b..19f12244783 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -287,6 +287,9 @@ public abstract class QueryChecker {
/** */
private boolean ordered;
+ /** */
+ private boolean withRowsIterator;
+
/** */
private Object[] params = X.EMPTY_OBJECT_ARRAY;
@@ -305,6 +308,13 @@ public abstract class QueryChecker {
return this;
}
+ /** */
+ public QueryChecker withRowsIterator(boolean flag) {
+ withRowsIterator = flag;
+
+ return this;
+ }
+
/** */
public QueryChecker withParams(Object... params) {
this.params = params;
@@ -388,7 +398,14 @@ public abstract class QueryChecker {
assertThat("Column names don't match", colNames,
equalTo(expectedColumnNames));
}
- List<List<?>> res = cur.getAll();
+ List<List<?>> res;
+ if (withRowsIterator) {
+ res = new ArrayList<>();
+ for (Iterator<List<?>> it = cur.iterator(); it.hasNext(); )
+ res.add(it.next());
+ }
+ else
+ res = cur.getAll();
if (expectedResultSize >= 0)
assertEquals("Unexpected result size", expectedResultSize,
res.size());
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
index f5c684e59cf..99cd26fd9c2 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java
@@ -400,4 +400,44 @@ public class MemoryQuotasIntegrationTest extends
AbstractBasicIntegrationTest {
}
}
}
+
+ /** */
+ @Test
+ public void testGetAll() {
+ // getAll for 800 rows.
+ assertQuery("SELECT id, b FROM tbl WHERE id < 800")
+ .withRowsIterator(false)
+ .resultSize(800)
+ .check();
+
+ // getAll + collect for 1000 rows.
+ assertThrows("SELECT id, b FROM tbl",
+ IgniteException.class, "Query quota exceeded");
+
+ // Collect for 800 rows.
+ assertQuery("SELECT ARRAY(SELECT b FROM tbl WHERE id < 800)")
+ .resultSize(1)
+ .check();
+
+ // getAll + collect for 400 rows.
+ assertQuery("SELECT ARRAY(SELECT b FROM tbl WHERE id < 400)")
+ .withRowsIterator(false)
+ .resultSize(1)
+ .check();
+
+ // getAll + collect for 800 rows.
+ assertThrows("SELECT ARRAY(SELECT b FROM tbl WHERE id < 800)",
+ IgniteException.class, "Query quota exceeded");
+
+ // getAll + sort for 800 rows (sort node release memory after passing
rows to iterator).
+ assertQuery("SELECT id, b FROM tbl WHERE id < 800 ORDER BY id")
+ .withRowsIterator(false)
+ .resultSize(800)
+ .check();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected QueryChecker assertQuery(String qry) {
+ return super.assertQuery(qry).withRowsIterator(true);
+ }
}