This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 34db074f920 IGNITE-23542 MINOR: ScanQuery code clean up (#11620)
34db074f920 is described below
commit 34db074f9201f62fbb69913984eadbbc68d78ea2
Author: Nikolay <[email protected]>
AuthorDate: Fri Oct 25 16:45:29 2024 +0300
IGNITE-23542 MINOR: ScanQuery code clean up (#11620)
---
.../cache/query/GridCacheQueryAdapter.java | 35 +------
.../cache/query/GridCacheQueryManager.java | 114 ++++++++++++---------
.../datastructures/GridCacheSetImpl.java | 10 +-
3 files changed, 74 insertions(+), 85 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 5e7b7b77ee1..3ffb78b2c01 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -156,25 +156,10 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
boolean forceLocal,
Boolean dataPageScanEnabled
) {
- assert cctx != null;
- assert type != null;
- assert part == null || part >= 0;
+ this(cctx, type, null, null, filter, part, false, keepBinary,
dataPageScanEnabled, null);
- this.cctx = cctx;
- this.type = type;
- this.filter = filter;
this.transform = transform;
- this.part = part;
- this.keepBinary = keepBinary;
this.forceLocal = forceLocal;
- this.dataPageScanEnabled = dataPageScanEnabled;
-
- log = cctx.logger(getClass());
-
- this.incMeta = false;
- this.clsName = null;
- this.clause = null;
- this.idxQryDesc = null;
}
/**
@@ -199,7 +184,8 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
@Nullable Integer part,
boolean incMeta,
boolean keepBinary,
- Boolean dataPageScanEnabled
+ Boolean dataPageScanEnabled,
+ IndexQueryDesc idxQryDesc
) {
assert cctx != null;
assert type != null;
@@ -214,10 +200,9 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
this.incMeta = incMeta;
this.keepBinary = keepBinary;
this.dataPageScanEnabled = dataPageScanEnabled;
+ this.idxQryDesc = idxQryDesc;
log = cctx.logger(getClass());
-
- this.idxQryDesc = null;
}
/**
@@ -299,17 +284,7 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
@Nullable String clsName,
@Nullable IgniteBiPredicate<Object, Object> filter
) {
- this.cctx = cctx;
- this.type = type;
- this.clsName = clsName;
- this.idxQryDesc = idxQryDesc;
- this.filter = filter;
- this.part = part;
-
- log = cctx.logger(getClass());
-
- clause = null;
- incMeta = false;
+ this(cctx, type, clsName, null, filter, part, false, false, null,
idxQryDesc);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 48a0aaa2006..7dc28548194 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -795,15 +795,9 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?>
qry, IgniteClosure transformer,
boolean locNode)
throws IgniteCheckedException {
- final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
- final InternalScanFilter<K, V> intFilter = keyValFilter != null ? new
InternalScanFilter<>(keyValFilter) : null;
+ final InternalScanFilter<K, V> intFilter =
internalFilter(qry.scanFilter());
try {
- if (keyValFilter instanceof PlatformCacheEntryFilter)
- ((PlatformCacheEntryFilter)keyValFilter).cacheContext(cctx);
- else
- injectResources(keyValFilter);
-
Integer part = qry.partition();
if (part != null && (part < 0 || part >=
cctx.affinity().partitions()))
@@ -862,8 +856,8 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
}
ScanQueryIterator iter = new ScanQueryIterator(it, qry, topVer,
locPart,
- SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteBiPredicate.class, keyValFilter),
- SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteClosure.class, transformer),
+ intFilter,
+ prepareTransformer(transformer),
locNode, locNode ? locIters : null, cctx, log);
if (locNode) {
@@ -882,21 +876,51 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
}
}
+ /** */
+ private @Nullable IgniteClosure<?, ?> prepareTransformer(IgniteClosure<?,
?> transformer) throws IgniteCheckedException {
+ return SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteClosure.class, injectResources(transformer));
+ }
+
+ /** */
+ private @Nullable InternalScanFilter<K, V>
internalFilter(IgniteBiPredicate<K, V> keyValFilter) throws
IgniteCheckedException {
+ if (keyValFilter == null)
+ return null;
+
+ try {
+ if (keyValFilter instanceof PlatformCacheEntryFilter)
+ ((PlatformCacheEntryFilter)keyValFilter).cacheContext(cctx);
+ else
+ injectResources(keyValFilter);
+
+ keyValFilter = SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteBiPredicate.class, keyValFilter);
+
+ return new InternalScanFilter<>(keyValFilter);
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ InternalScanFilter.close(keyValFilter);
+
+ throw e;
+ }
+ }
+
/**
* @param o Object to inject resources to.
* @throws IgniteCheckedException If failure occurred while injecting
resources.
*/
- private void injectResources(@Nullable Object o) throws
IgniteCheckedException {
- if (o != null) {
- GridKernalContext ctx = cctx.kernalContext();
+ private <R> R injectResources(@Nullable R o) throws IgniteCheckedException
{
+ if (o == null)
+ return null;
- ClassLoader ldr = o.getClass().getClassLoader();
+ GridKernalContext ctx = cctx.kernalContext();
- if (ctx.deploy().isGlobalLoader(ldr))
-
ctx.resource().inject(ctx.deploy().getDeployment(ctx.deploy().getClassLoaderId(ldr)),
o.getClass(), o);
- else
-
ctx.resource().inject(ctx.deploy().getDeployment(o.getClass().getName()),
o.getClass(), o);
- }
+ ClassLoader ldr = o.getClass().getClassLoader();
+
+ if (ctx.deploy().isGlobalLoader(ldr))
+
ctx.resource().inject(ctx.deploy().getDeployment(ctx.deploy().getClassLoaderId(ldr)),
o.getClass(), o);
+ else
+
ctx.resource().inject(ctx.deploy().getDeployment(o.getClass().getName()),
o.getClass(), o);
+
+ return o;
}
/**
@@ -904,7 +928,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
*
* @param qryInfo Query info.
*/
- protected void runFieldsQuery(GridCacheQueryInfo qryInfo) {
+ protected void runFieldsQuery(final GridCacheQueryInfo qryInfo) {
assert qryInfo != null;
if (!enterBusy()) {
@@ -928,9 +952,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteReducer<Object, Object> rdc = (IgniteReducer<Object,
Object>)qryInfo.reducer();
-
- injectResources(rdc);
+ final IgniteReducer<Object, Object> rdc =
injectResources((IgniteReducer<Object, Object>)qryInfo.reducer());
GridCacheQueryAdapter<?> qry = qryInfo.query();
@@ -939,7 +961,9 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
Collection<Object> data = null;
Collection<Object> entities = null;
- if (qryInfo.local() || rdc != null ||
cctx.isLocalNode(qryInfo.senderId()))
+ boolean isWriteData = qryInfo.local() || rdc != null ||
cctx.isLocalNode(qryInfo.senderId());
+
+ if (isWriteData)
data = new ArrayList<>(pageSize);
else
entities = new ArrayList<>(pageSize);
@@ -1017,7 +1041,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
row));
}
- if ((qryInfo.local() || rdc != null ||
cctx.isLocalNode(qryInfo.senderId()))) {
+ if (isWriteData) {
// Reduce.
if (rdc != null) {
if (!rdc.collect(row))
@@ -1125,14 +1149,7 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
GridCacheQueryAdapter<?> qry = qryInfo.query();
try {
- // Preparing query closures.
- IgniteClosure<Cache.Entry<K, V>, Object> trans =
- (IgniteClosure<Cache.Entry<K, V>,
Object>)qryInfo.transformer();
-
- IgniteReducer<Cache.Entry<K, V>, Object> rdc =
(IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer();
-
- injectResources(trans);
- injectResources(rdc);
+ IgniteReducer<Cache.Entry<K, V>, Object> rdc =
injectResources((IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer());
int pageSize = qry.pageSize();
@@ -1143,10 +1160,9 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
IgniteSpiCloseableIterator iter;
GridCacheQueryType type;
- res = loc ?
- executeQuery(qry, trans, loc, taskName,
- recipient(qryInfo.senderId(), qryInfo.requestId())) :
- queryResult(qryInfo, taskName);
+ res = loc
+ ? executeQuery(qry, qryInfo.transformer(), loc, taskName,
recipient(qryInfo.senderId(), qryInfo.requestId()))
+ : queryResult(qryInfo, taskName);
if (res == null)
return;
@@ -1410,11 +1426,7 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
taskName));
}
- IgniteClosure transformer = qry.transform();
-
- injectResources(transformer);
-
- GridCloseableIterator it = scanIterator(qry, transformer, true);
+ GridCloseableIterator<?> it = scanIterator(qry, qry.transform(),
true);
updateStatistics = false;
@@ -2518,9 +2530,6 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
*
*/
private static class FieldsResult<Q> extends CachedResult<Q> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** */
private List<GridQueryFieldMetadata> meta;
@@ -2875,9 +2884,8 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
null,
null,
null,
- null,
- false,
keepBinary,
+ false,
null);
}
@@ -2949,6 +2957,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
null,
false,
keepBinary,
+ null,
null)
.limit(limit)
.pageSize(pageSize);
@@ -3115,7 +3124,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @param qry Query.
* @param topVer Topology version.
* @param locPart Local partition.
- * @param scanFilter Scan filter.
+ * @param intScanFilter Internal scan filter.
* @param transformer Transformer.
* @param locNode Local node flag.
* @param locIters Local iterators set.
@@ -3127,7 +3136,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
GridCacheQueryAdapter qry,
AffinityTopologyVersion topVer,
GridDhtLocalPartition locPart,
- IgniteBiPredicate<K, V> scanFilter,
+ InternalScanFilter<K, V> intScanFilter,
IgniteClosure transformer,
boolean locNode,
@Nullable GridConcurrentHashSet<ScanQueryIterator> locIters,
@@ -3138,7 +3147,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
this.it = it;
this.topVer = topVer;
this.locPart = locPart;
- this.intScanFilter = scanFilter != null ? new
InternalScanFilter<>(scanFilter) : null;
+ this.intScanFilter = intScanFilter;
this.cctx = cctx;
this.log = log;
@@ -3157,7 +3166,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
subjId = securitySubjectId(cctx);
// keep binary for remote scans if possible
- keepBinary = (!locNode && scanFilter == null && transformer ==
null && !readEvt) || qry.keepBinary();
+ keepBinary = (!locNode && intScanFilter == null && transformer ==
null && !readEvt) || qry.keepBinary();
transform = transformer;
dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
cache = dht != null ? dht : cctx.cache();
@@ -3427,6 +3436,11 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
/** */
void close() {
+ close(scanFilter);
+ }
+
+ /** */
+ static void close(IgniteBiPredicate<?, ?> scanFilter) {
if (scanFilter instanceof PlatformCacheEntryFilter)
((PlatformCacheEntryFilter)scanFilter).onClose();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 8d95044f186..05ac4b38826 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -165,8 +165,8 @@ public class GridCacheSetImpl<T> extends
AbstractCollection<T> implements Ignite
return cache.sizeAsync(new CachePeekMode[] {}).get() - 1;
}
- CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), collocated ?
hdrPart : null,
+ CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET,
+ new GridSetQueryPredicate<>(id, collocated), null, collocated
? hdrPart : null,
false, false, null);
Collection<ClusterNode> nodes =
dataNodes(ctx.affinity().affinityTopologyVersion());
@@ -440,9 +440,9 @@ public class GridCacheSetImpl<T> extends
AbstractCollection<T> implements Ignite
*/
@SuppressWarnings("unchecked")
private WeakReferenceCloseableIterator<T> sharedCacheIterator(boolean
keepBinary) throws IgniteCheckedException {
- CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart
: null,
- false, keepBinary, null);
+ CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET,
+ new GridSetQueryPredicate<>(id, collocated), null, collocated ?
hdrPart : null,
+ keepBinary, false, null);
Collection<ClusterNode> nodes =
dataNodes(ctx.affinity().affinityTopologyVersion());