This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c1f1f301413 IGNITE-23650 SQL Calcite: Refactor IndexScan/TableScan
classes hierarchy - Fixes #11650.
c1f1f301413 is described below
commit c1f1f3014137bc07d7a88b271bb67b4d995bb9e4
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Nov 21 09:37:29 2024 +0300
IGNITE-23650 SQL Calcite: Refactor IndexScan/TableScan classes hierarchy -
Fixes #11650.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../calcite/exec/AbstractCacheColumnsScan.java | 55 ++++
.../query/calcite/exec/AbstractCacheScan.java | 156 ++++++++++++
.../query/calcite/exec/IndexFirstLastScan.java | 22 +-
.../processors/query/calcite/exec/IndexScan.java | 278 ++++++---------------
.../query/calcite/exec/RuntimeHashIndex.java | 7 +-
.../query/calcite/exec/RuntimeSortedIndex.java | 41 +--
.../processors/query/calcite/exec/TableScan.java | 138 +---------
.../processors/query/calcite/exec/TreeIndex.java | 4 +-
...stractIndexScan.java => TreeIndexIterable.java} | 69 +----
.../calcite/exec/exp/TransformRangeIterable.java | 88 +++++++
.../query/calcite/exec/RuntimeSortedIndexTest.java | 2 +-
11 files changed, 423 insertions(+), 437 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
new file mode 100644
index 00000000000..c5966bf9f54
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public abstract class AbstractCacheColumnsScan<Row> extends
AbstractCacheScan<Row> {
+ /** */
+ protected final CacheTableDescriptor desc;
+
+ /** */
+ protected final RowFactory<Row> factory;
+
+ /** */
+ protected final RelDataType rowType;
+
+ /** Participating columns. */
+ protected final ImmutableBitSet requiredColumns;
+
+ /** */
+ AbstractCacheColumnsScan(
+ ExecutionContext<Row> ectx,
+ CacheTableDescriptor desc,
+ int[] parts,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ super(ectx, desc.cacheContext(), parts);
+
+ this.desc = desc;
+ this.requiredColumns = requiredColumns;
+
+ rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);
+ factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
new file mode 100644
index 00000000000..b35290e1bd9
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public abstract class AbstractCacheScan<Row> implements Iterable<Row>,
AutoCloseable {
+ /** */
+ protected final GridCacheContext<?, ?> cctx;
+
+ /** */
+ protected final ExecutionContext<Row> ectx;
+
+ /** */
+ protected final AffinityTopologyVersion topVer;
+
+ /** */
+ protected final int[] parts;
+
+ /** */
+ protected volatile List<GridDhtLocalPartition> reserved;
+
+ /** */
+ AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx,
int[] parts) {
+ this.ectx = ectx;
+ this.cctx = cctx;
+ this.parts = parts;
+
+ topVer = ectx.topologyVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final Iterator<Row> iterator() {
+ reserve();
+
+ try {
+ return createIterator();
+ }
+ catch (Exception e) {
+ release();
+
+ throw e;
+ }
+ }
+
+ /** Rows iterator.*/
+ protected abstract Iterator<Row> createIterator();
+
+ /** */
+ @Override public void close() {
+ release();
+ }
+
+ /** */
+ private synchronized void reserve() {
+ if (reserved != null)
+ return;
+
+ GridDhtPartitionTopology top = cctx.topology();
+
+ top.readLock();
+
+ GridDhtTopologyFuture topFut = top.topologyVersionFuture();
+
+ boolean done = topFut.isDone();
+
+ if (!done || !(topFut.topologyVersion().compareTo(topVer) >= 0
+ &&
cctx.shared().exchange().lastAffinityChangedTopologyVersion(topFut.initialVersion()).compareTo(topVer)
<= 0)) {
+ top.readUnlock();
+
+ throw new ClusterTopologyException("Topology was changed. Please
retry on stable topology.");
+ }
+
+ List<GridDhtLocalPartition> toReserve;
+
+ if (cctx.isReplicated()) {
+ int partsCnt = cctx.affinity().partitions();
+
+ toReserve = new ArrayList<>(partsCnt);
+
+ for (int i = 0; i < partsCnt; i++)
+ toReserve.add(top.localPartition(i));
+ }
+ else if (cctx.isPartitioned()) {
+ assert parts != null;
+
+ toReserve = new ArrayList<>(parts.length);
+
+ for (int i = 0; i < parts.length; i++)
+ toReserve.add(top.localPartition(parts[i]));
+ }
+ else
+ toReserve = Collections.emptyList();
+
+ reserved = new ArrayList<>(toReserve.size());
+
+ try {
+ for (GridDhtLocalPartition part : toReserve) {
+ if (part == null || !part.reserve())
+ throw new ClusterTopologyException("Failed to reserve
partition for query execution. Retry on stable topology.");
+ else if (part.state() != GridDhtPartitionState.OWNING) {
+ part.release();
+
+ throw new ClusterTopologyException("Failed to reserve
partition for query execution. Retry on stable topology.");
+ }
+
+ reserved.add(part);
+ }
+ }
+ catch (Exception e) {
+ release();
+
+ throw e;
+ }
+ finally {
+ top.readUnlock();
+ }
+ }
+
+ /** */
+ private synchronized void release() {
+ if (F.isEmpty(reserved))
+ return;
+
+ reserved.forEach(GridDhtLocalPartition::release);
+
+ reserved = null;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
index bcc656d8602..ae6fea8dfc2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.util.lang.GridCursor;
@@ -31,6 +32,9 @@ import org.jetbrains.annotations.Nullable;
* Takes only first or last index value excluding nulls.
*/
public class IndexFirstLastScan<Row> extends IndexScan<Row> {
+ /** First or last field value. */
+ private final boolean first;
+
/**
* @param first {@code True} to take first index value. {@code False} to
take last value.
* @param ectx Execution context.
@@ -49,7 +53,14 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row> {
int[] parts,
@Nullable ImmutableBitSet requiredColumns
) {
- super(ectx, desc, new FirstLastIndexWrapper(idx, first),
idxFieldMapping, parts, null, requiredColumns);
+ super(ectx, desc, idx, idxFieldMapping, parts, null, requiredColumns);
+
+ this.first = first;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TreeIndex<IndexRow> treeIndex() {
+ return new FirstLastIndexWrapper(idx, indexQueryContext(), first);
}
/** {@inheritDoc} */
@@ -69,10 +80,12 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row>
{
/**
* @param idx Index
+ * @param qctx Query context.
* @param first {@code True} to take first index value. {@code False}
to take last value.
*/
- protected FirstLastIndexWrapper(InlineIndexImpl idx, boolean first) {
- super(idx);
+ protected FirstLastIndexWrapper(InlineIndex idx, IndexQueryContext
qctx, boolean first) {
+ super(idx, qctx);
+
this.first = first;
}
@@ -81,8 +94,7 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row> {
IndexRow lower,
IndexRow upper,
boolean lowerInclude,
- boolean upperInclude,
- IndexQueryContext qctx
+ boolean upperInclude
) {
assert lower == null && upper == null;
assert lowerInclude && upperInclude;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index e30974f50d9..f842c12368e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -27,7 +25,6 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.IndexPlainRowImpl;
@@ -42,17 +39,11 @@ import
org.apache.ignite.internal.cache.query.index.sorted.inline.SortedSegmente
import org.apache.ignite.internal.cache.query.index.sorted.inline.io.InlineIO;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
import
org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
+import
org.apache.ignite.internal.processors.query.calcite.exec.exp.TransformRangeIterable;
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
@@ -67,30 +58,12 @@ import org.jetbrains.annotations.Nullable;
/**
* Scan on index.
*/
-public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
+public class IndexScan<Row> extends AbstractCacheColumnsScan<Row> {
/** */
private final GridKernalContext kctx;
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final CacheTableDescriptor desc;
-
- /** */
- private final RowFactory<Row> factory;
-
- /** */
- private final AffinityTopologyVersion topVer;
-
- /** */
- private final int[] parts;
-
- /** */
- private volatile List<GridDhtLocalPartition> reserved;
-
- /** */
- private final ImmutableBitSet requiredColumns;
+ /** Index scan bounds. */
+ private final RangeIterable<Row> ranges;
/** */
protected final InlineIndex idx;
@@ -126,41 +99,12 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
RangeIterable<Row> ranges,
@Nullable ImmutableBitSet requiredColumns
) {
- this(ectx, desc, new TreeIndexWrapper(idx), idxFieldMapping, parts,
ranges, requiredColumns);
- }
+ super(ectx, desc, parts, requiredColumns);
+ this.ranges = ranges;
- /**
- * @param ectx Execution context.
- * @param desc Table descriptor.
- * @param idxFieldMapping Mapping from index keys to row fields.
- * @param treeIdx Physical index wrapper.
- * @param ranges Index scan bounds.
- */
- protected IndexScan(
- ExecutionContext<Row> ectx,
- CacheTableDescriptor desc,
- TreeIndexWrapper treeIdx,
- ImmutableIntList idxFieldMapping,
- int[] parts,
- RangeIterable<Row> ranges,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- super(
- ectx,
- desc.rowType(ectx.getTypeFactory(), requiredColumns),
- treeIdx,
- ranges
- );
-
- this.desc = desc;
- this.idx = treeIdx.idx;
- cctx = desc.cacheContext();
+ this.idx = idx;
kctx = cctx.kernalContext();
- factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
- topVer = ectx.topologyVersion();
- this.parts = parts;
- this.requiredColumns = requiredColumns;
this.idxFieldMapping = idxFieldMapping;
RelDataType srcRowType = desc.rowType(ectx.getTypeFactory(), null);
@@ -185,7 +129,6 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
}
else
txChanges = null;
-
}
/**
@@ -235,47 +178,24 @@ public class IndexScan<Row> extends
AbstractIndexScan<Row, IndexRow> {
}
/** {@inheritDoc} */
- @Override public synchronized Iterator<Row> iterator() {
- reserve();
+ @Override protected Iterator<Row> createIterator() {
+ RangeIterable<IndexRow> ranges0 = ranges == null ? null : new
TransformRangeIterable<>(ranges, this::row2indexRow);
- try {
- return super.iterator();
- }
- catch (Exception e) {
- release();
+ TreeIndex<IndexRow> treeIdx = treeIndex();
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected GridCursor<IndexRow> indexCursor(IndexRow lower,
IndexRow upper, boolean lowerInclude, boolean upperInclude) {
- GridCursor<IndexRow> idxCursor = super.indexCursor(lower, upper,
lowerInclude, upperInclude);
+ if (txChanges != null)
+ treeIdx = new TxAwareTreeIndexWrapper(treeIdx);
- if (txChanges == null)
- return idxCursor;
+ return F.iterator(new TreeIndexIterable<>(treeIdx, ranges0),
this::indexRow2Row, true);
+ }
- // `txChanges` returns single thread data structures e.g. `HashSet`,
`ArrayList`.
- // It safe to use them in multiple `FilteredCursor` instances,
because, multi range index scan will be flat to the single cursor.
- // See AbstractIndexScan#iterator.
- try {
- return new SortedSegmentedIndexCursor(
- new GridCursor[]{
- // This call will change `txChanges.get1()` content.
- // Removing found key from set more efficient so we break
some rules here.
- new KeyFilteringCursor<>(idxCursor, txChanges.get1(), r ->
r.cacheDataRow().key()),
- new SortedListRangeCursor<>(this::compare,
txChanges.get2(), lower, upper, lowerInclude, upperInclude)
- },
- idx.indexDefinition()
- );
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ /** */
+ protected TreeIndex<IndexRow> treeIndex() {
+ return new TreeIndexWrapper(idx, indexQueryContext());
}
- /** {@inheritDoc} */
- @Override protected IndexRow row2indexRow(Row bound) {
+ /** From Row to IndexRow convertor. */
+ protected IndexRow row2indexRow(Row bound) {
if (bound == null)
return null;
@@ -306,12 +226,17 @@ public class IndexScan<Row> extends
AbstractIndexScan<Row, IndexRow> {
return nullSearchRow ? null : new IndexPlainRowImpl(keys, idxRowHnd);
}
- /** {@inheritDoc} */
- @Override protected Row indexRow2Row(IndexRow row) throws
IgniteCheckedException {
- if (row.indexPlainRow())
- return inlineIndexRow2Row(row);
- else
- return desc.toRow(ectx, row.cacheDataRow(), factory,
requiredColumns);
+ /** From IndexRow to Row convertor. */
+ protected Row indexRow2Row(IndexRow row) {
+ try {
+ if (row.indexPlainRow())
+ return inlineIndexRow2Row(row);
+ else
+ return desc.toRow(ectx, row.cacheDataRow(), factory,
requiredColumns);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/** */
@@ -326,91 +251,8 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
return res;
}
- /** */
- @Override public void close() {
- release();
- }
-
- /** */
- private synchronized void reserve() {
- if (reserved != null)
- return;
-
- GridDhtPartitionTopology top = cctx.topology();
- top.readLock();
-
- GridDhtTopologyFuture topFut = top.topologyVersionFuture();
-
- boolean done = topFut.isDone();
-
- if (!done || !(topFut.topologyVersion().compareTo(topVer) >= 0
- &&
cctx.shared().exchange().lastAffinityChangedTopologyVersion(topFut.initialVersion()).compareTo(topVer)
<= 0)) {
- top.readUnlock();
-
- throw new ClusterTopologyException("Topology was changed. Please
retry on stable topology.");
- }
-
- List<GridDhtLocalPartition> toReserve;
-
- if (cctx.isReplicated()) {
- int partsCnt = cctx.affinity().partitions();
- toReserve = new ArrayList<>(partsCnt);
- for (int i = 0; i < partsCnt; i++)
- toReserve.add(top.localPartition(i));
- }
- else if (cctx.isPartitioned()) {
- assert parts != null;
-
- toReserve = new ArrayList<>(parts.length);
- for (int i = 0; i < parts.length; i++)
- toReserve.add(top.localPartition(parts[i]));
- }
- else
- toReserve = Collections.emptyList();
-
- reserved = new ArrayList<>(toReserve.size());
-
- try {
- for (GridDhtLocalPartition part : toReserve) {
- if (part == null || !part.reserve()) {
- throw new ClusterTopologyException(
- "Failed to reserve partition for query execution.
Retry on stable topology."
- );
- }
- else if (part.state() != GridDhtPartitionState.OWNING) {
- part.release();
-
- throw new ClusterTopologyException(
- "Failed to reserve partition for query execution.
Retry on stable topology."
- );
- }
-
- reserved.add(part);
- }
- }
- catch (Exception e) {
- release();
-
- throw e;
- }
- finally {
- top.readUnlock();
- }
- }
-
- /** */
- private synchronized void release() {
- if (reserved == null)
- return;
-
- for (GridDhtLocalPartition part : reserved)
- part.release();
-
- reserved = null;
- }
-
- /** {@inheritDoc} */
- @Override protected IndexQueryContext indexQueryContext() {
+ /** Query context. */
+ protected IndexQueryContext indexQueryContext() {
IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer,
parts);
InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
@@ -496,7 +338,7 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
InlineIndexKeyType keyType = F.isEmpty(inlineKeyTypes) ? null :
inlineKeyTypes.get(0);
- return new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() {
+ return new BPlusTree.TreeRowClosure<>() {
private IndexRow idxRow;
/** {@inheritDoc} */
@@ -535,7 +377,7 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
/** */
public static BPlusTree.TreeRowClosure<IndexRow, IndexRow>
createNotExpiredRowFilter() {
- return new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() {
+ return new BPlusTree.TreeRowClosure<>() {
private IndexRow idxRow;
@Override public boolean apply(
@@ -557,14 +399,61 @@ public class IndexScan<Row> extends
AbstractIndexScan<Row, IndexRow> {
};
}
+ /** */
+ protected class TxAwareTreeIndexWrapper implements TreeIndex<IndexRow> {
+ /** */
+ private final TreeIndex<IndexRow> delegate;
+
+ /** */
+ protected TxAwareTreeIndexWrapper(TreeIndex<IndexRow> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<IndexRow> find(
+ IndexRow lower,
+ IndexRow upper,
+ boolean lowerInclude,
+ boolean upperInclude
+ ) {
+ GridCursor<IndexRow> idxCursor = delegate.find(lower, upper,
lowerInclude, upperInclude);
+
+ assert txChanges != null;
+
+ // `txChanges` returns single thread data structures e.g.
`HashSet`, `ArrayList`.
+ // It safe to use them in multiple `FilteredCursor` instances,
because, multi range index scan will be
+ // flat to the single cursor.
+ // See AbstractIndexScan#iterator.
+ try {
+ return new SortedSegmentedIndexCursor(
+ new GridCursor[]{
+ // This call will change `txChanges.get1()` content.
+ // Removing found key from set more efficient so we
break some rules here.
+ new KeyFilteringCursor<>(idxCursor, txChanges.get1(),
r -> r.cacheDataRow().key()),
+ new SortedListRangeCursor<>(
+ IndexScan.this::compare, txChanges.get2(), lower,
upper, lowerInclude, upperInclude)
+ },
+ idx.indexDefinition()
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
/** */
protected static class TreeIndexWrapper implements TreeIndex<IndexRow> {
/** Underlying index. */
protected final InlineIndex idx;
+ /** Query context. */
+ protected final IndexQueryContext qctx;
+
/** */
- protected TreeIndexWrapper(InlineIndex idx) {
+ protected TreeIndexWrapper(InlineIndex idx, IndexQueryContext qctx) {
this.idx = idx;
+ this.qctx = qctx;
}
/** {@inheritDoc} */
@@ -572,8 +461,7 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
IndexRow lower,
IndexRow upper,
boolean lowerInclude,
- boolean upperInclude,
- IndexQueryContext qctx
+ boolean upperInclude
) {
try {
return idx.find(lower, upper, lowerInclude, upperInclude,
qctx);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
index f0163e6d91e..82973c64a37 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
@@ -108,7 +108,7 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
/**
*
*/
- private class IndexScan implements Iterable<Row>, AutoCloseable {
+ private class IndexScan implements Iterable<Row> {
/** Search row. */
private final Supplier<Row> searchRow;
@@ -119,11 +119,6 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
this.searchRow = searchRow;
}
- /** {@inheritDoc} */
- @Override public void close() {
- // No-op.
- }
-
/** {@inheritDoc} */
@NotNull @Override public Iterator<Row> iterator() {
GroupKey key = key(searchRow.get());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
index c53a81e28af..efb4092d1ba 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
@@ -21,7 +21,6 @@ import java.util.Comparator;
import java.util.Objects;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
-import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
@@ -75,11 +74,8 @@ public class RuntimeSortedIndex<Row> implements
RuntimeIndex<Row>, TreeIndex<Row
Row lower,
Row upper,
boolean lowerInclude,
- boolean upperInclude,
- IndexQueryContext qctx
+ boolean upperInclude
) {
- assert qctx == null;
-
int firstCol = F.first(collation.getKeys());
Object lowerBound = (lower == null) ? null :
ectx.rowHandler().get(firstCol, lower);
@@ -99,39 +95,6 @@ public class RuntimeSortedIndex<Row> implements
RuntimeIndex<Row>, TreeIndex<Row
RelDataType rowType,
RangeIterable<Row> ranges
) {
- return new IndexScan(rowType, this, ranges);
- }
-
- /**
- *
- */
- private class IndexScan extends AbstractIndexScan<Row, Row> {
- /**
- * @param rowType Row type.
- * @param idx Physical index.
- * @param ranges Index scan bounds.
- */
- IndexScan(
- RelDataType rowType,
- TreeIndex<Row> idx,
- RangeIterable<Row> ranges
- ) {
- super(RuntimeSortedIndex.this.ectx, rowType, idx, ranges);
- }
-
- /** {@inheritDoc} */
- @Override protected Row row2indexRow(Row bound) {
- return bound;
- }
-
- /** {@inheritDoc} */
- @Override protected Row indexRow2Row(Row row) {
- return row;
- }
-
- /** */
- @Override protected IndexQueryContext indexQueryContext() {
- return null;
- }
+ return new TreeIndexIterable<>(this, ranges);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index 2b618cf04d7..d95b2c21fe3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -26,20 +25,12 @@ import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
-import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
@@ -49,137 +40,20 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/** */
-public class TableScan<Row> implements Iterable<Row>, AutoCloseable {
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final ExecutionContext<Row> ectx;
-
- /** */
- private final CacheTableDescriptor desc;
-
- /** */
- private final RowFactory<Row> factory;
-
- /** */
- private final AffinityTopologyVersion topVer;
-
- /** */
- private final int[] parts;
-
- /** */
- private volatile List<GridDhtLocalPartition> reserved;
-
- /** Participating colunms. */
- private final ImmutableBitSet requiredColunms;
-
+public class TableScan<Row> extends AbstractCacheColumnsScan<Row> {
/** */
public TableScan(
ExecutionContext<Row> ectx,
CacheTableDescriptor desc,
int[] parts,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) {
- this.ectx = ectx;
- cctx = desc.cacheContext();
- this.desc = desc;
- this.parts = parts;
- this.requiredColunms = requiredColunms;
-
- RelDataType rowType = desc.rowType(this.ectx.getTypeFactory(),
requiredColunms);
-
- factory = this.ectx.rowHandler().factory(this.ectx.getTypeFactory(),
rowType);
- topVer = ectx.topologyVersion();
+ super(ectx, desc, parts, requiredColumns);
}
/** {@inheritDoc} */
- @Override public Iterator<Row> iterator() {
- reserve();
- try {
- return new IteratorImpl();
- }
- catch (Exception e) {
- release();
-
- throw e;
- }
- }
-
- /** */
- @Override public void close() {
- release();
- }
-
- /** */
- private synchronized void reserve() {
- if (reserved != null)
- return;
-
- GridDhtPartitionTopology top = cctx.topology();
- top.readLock();
-
- GridDhtTopologyFuture topFut = top.topologyVersionFuture();
-
- boolean done = topFut.isDone();
-
- if (!done || !(topFut.topologyVersion().compareTo(topVer) >= 0
- &&
cctx.shared().exchange().lastAffinityChangedTopologyVersion(topFut.initialVersion()).compareTo(topVer)
<= 0)) {
- top.readUnlock();
-
- throw new ClusterTopologyException("Topology was changed. Please
retry on stable topology.");
- }
-
- List<GridDhtLocalPartition> toReserve;
- if (cctx.isReplicated()) {
- int partsCnt = cctx.affinity().partitions();
- toReserve = new ArrayList<>(partsCnt);
- for (int i = 0; i < partsCnt; i++)
- toReserve.add(top.localPartition(i));
- }
- else if (cctx.isPartitioned()) {
- assert parts != null;
-
- toReserve = new ArrayList<>(parts.length);
- for (int i = 0; i < parts.length; i++)
- toReserve.add(top.localPartition(parts[i]));
- }
- else
- toReserve = Collections.emptyList();
-
- reserved = new ArrayList<>(toReserve.size());
-
- try {
- for (GridDhtLocalPartition part : toReserve) {
- if (part == null || !part.reserve())
- throw new ClusterTopologyException("Failed to reserve
partition for query execution. Retry on stable topology.");
- else if (part.state() != GridDhtPartitionState.OWNING) {
- part.release();
-
- throw new ClusterTopologyException("Failed to reserve
partition for query execution. Retry on stable topology.");
- }
-
- reserved.add(part);
- }
- }
- catch (Exception e) {
- release();
-
- throw e;
- }
- finally {
- top.readUnlock();
- }
- }
-
- /** */
- private synchronized void release() {
- if (F.isEmpty(reserved))
- return;
-
- reserved.forEach(GridDhtLocalPartition::release);
-
- reserved = null;
+ @Override protected Iterator<Row> createIterator() {
+ return new IteratorImpl();
}
/**
@@ -286,7 +160,7 @@ public class TableScan<Row> implements Iterable<Row>,
AutoCloseable {
if (!desc.match(row))
continue;
- next = desc.toRow(ectx, row, factory, requiredColunms);
+ next = desc.toRow(ectx, row, factory, requiredColumns);
break;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndex.java
index 6b3e2c47d18..9e6b37aac42 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndex.java
@@ -16,7 +16,6 @@
*/
package org.apache.ignite.internal.processors.query.calcite.exec;
-import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import org.apache.ignite.internal.util.lang.GridCursor;
/**
@@ -32,8 +31,7 @@ public interface TreeIndex<R> {
* @param upper Upper bound.
* @param lowerInclude Inclusive lower bound.
* @param upperInclude Inclusive upper bound.
- * @param qctx Index query context.
* @return Cursor over the rows within bounds.
*/
- public GridCursor<R> find(R lower, R upper, boolean lowerInclude, boolean
upperInclude, IndexQueryContext qctx);
+ public GridCursor<R> find(R lower, R upper, boolean lowerInclude, boolean
upperInclude);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndexIterable.java
similarity index 59%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndexIterable.java
index 28b40966d94..12ad66588ae 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TreeIndexIterable.java
@@ -19,9 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
-import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeCondition;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import org.apache.ignite.internal.util.lang.GridCursor;
@@ -31,34 +29,20 @@ import org.apache.ignite.lang.IgniteClosure;
import org.jetbrains.annotations.NotNull;
/**
- * Abstract index scan.
+ * Tree index iterable.
*/
-public abstract class AbstractIndexScan<Row, IdxRow> implements Iterable<Row>,
AutoCloseable {
+public class TreeIndexIterable<Row> implements Iterable<Row> {
/** */
- private final TreeIndex<IdxRow> idx;
+ private final TreeIndex<Row> idx;
/** Index scan bounds. */
private final RangeIterable<Row> ranges;
- /** */
- protected final ExecutionContext<Row> ectx;
-
- /** */
- protected final RelDataType rowType;
-
/**
- * @param ectx Execution context.
- * @param idx Physical index.
+ * @param idx Tree index.
* @param ranges Index scan bounds.
*/
- protected AbstractIndexScan(
- ExecutionContext<Row> ectx,
- RelDataType rowType,
- TreeIndex<IdxRow> idx,
- RangeIterable<Row> ranges
- ) {
- this.ectx = ectx;
- this.rowType = rowType;
+ TreeIndexIterable(TreeIndex<Row> idx, RangeIterable<Row> ranges) {
this.idx = idx;
this.ranges = ranges;
}
@@ -66,15 +50,10 @@ public abstract class AbstractIndexScan<Row, IdxRow>
implements Iterable<Row>, A
/** {@inheritDoc} */
@Override public synchronized Iterator<Row> iterator() {
if (ranges == null)
- return new IteratorImpl(indexCursor(null, null, true, true));
-
- IgniteClosure<RangeCondition<Row>, IteratorImpl> clo = range -> {
- IdxRow lower = range.lower() == null ? null :
row2indexRow(range.lower());
- IdxRow upper = range.upper() == null ? null :
row2indexRow(range.upper());
+ return new CursorIteratorImpl(idx.find(null, null, true, true));
- return new IteratorImpl(
- indexCursor(lower, upper, range.lowerInclude(),
range.upperInclude()));
- };
+ IgniteClosure<RangeCondition<Row>, CursorIteratorImpl> clo = range ->
new CursorIteratorImpl(
+ idx.find(range.lower(), range.upper(), range.lowerInclude(),
range.upperInclude()));
if (!ranges.multiBounds()) {
Iterator<RangeCondition<Row>> it = ranges.iterator();
@@ -89,34 +68,15 @@ public abstract class AbstractIndexScan<Row, IdxRow>
implements Iterable<Row>, A
}
/** */
- protected GridCursor<IdxRow> indexCursor(IdxRow lower, IdxRow upper,
boolean lowerInclude, boolean upperInclude) {
- return idx.find(lower, upper, lowerInclude, upperInclude,
indexQueryContext());
- }
-
- /** */
- protected abstract IdxRow row2indexRow(Row bound);
-
- /** */
- protected abstract Row indexRow2Row(IdxRow idxRow) throws
IgniteCheckedException;
-
- /** */
- protected abstract IndexQueryContext indexQueryContext();
-
- /** {@inheritDoc} */
- @Override public void close() {
- // No-op.
- }
-
- /** */
- private class IteratorImpl extends GridIteratorAdapter<Row> {
+ private class CursorIteratorImpl extends GridIteratorAdapter<Row> {
/** */
- private final GridCursor<IdxRow> cursor;
+ private final GridCursor<Row> cursor;
/** Next element. */
private Row next;
/** */
- private IteratorImpl(@NotNull GridCursor<IdxRow> cursor) {
+ private CursorIteratorImpl(@NotNull GridCursor<Row> cursor) {
this.cursor = cursor;
}
@@ -153,11 +113,8 @@ public abstract class AbstractIndexScan<Row, IdxRow>
implements Iterable<Row>, A
if (next != null)
return;
- while (next == null && cursor.next()) {
- IdxRow idxRow = cursor.get();
-
- next = indexRow2Row(idxRow);
- }
+ while (next == null && cursor.next())
+ next = cursor.get();
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/TransformRangeIterable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/TransformRangeIterable.java
new file mode 100644
index 00000000000..0768d62a0d0
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/TransformRangeIterable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.exp;
+
+import java.util.Iterator;
+import java.util.function.Function;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Row-transformable range iterable.
+ */
+public class TransformRangeIterable<FromRow, ToRow> implements
RangeIterable<ToRow> {
+ /** */
+ private final RangeIterable<FromRow> delegate;
+
+ /** */
+ private final Function<FromRow, ToRow> rowTransformer;
+
+ /** */
+ public TransformRangeIterable(RangeIterable<FromRow> delegate,
Function<FromRow, ToRow> rowTransformer) {
+ this.delegate = delegate;
+ this.rowTransformer = rowTransformer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<RangeCondition<ToRow>> iterator() {
+ return F.iterator(delegate.iterator(), c -> new
TransformRangeCondition<>(c, rowTransformer), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean multiBounds() {
+ return delegate.multiBounds();
+ }
+
+ /** */
+ private static class TransformRangeCondition<FromRow, ToRow> implements
RangeCondition<ToRow> {
+ /** */
+ private final RangeCondition<FromRow> delegate;
+
+ /** */
+ private final Function<FromRow, ToRow> rowTransformer;
+
+ /** */
+ public TransformRangeCondition(RangeCondition<FromRow> delegate,
Function<FromRow, ToRow> rowTransformer) {
+ this.delegate = delegate;
+ this.rowTransformer = rowTransformer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ToRow lower() {
+ FromRow row = delegate.lower();
+
+ return row == null ? null : rowTransformer.apply(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ToRow upper() {
+ FromRow row = delegate.upper();
+
+ return row == null ? null : rowTransformer.apply(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean lowerInclude() {
+ return delegate.lowerInclude();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean upperInclude() {
+ return delegate.upperInclude();
+ }
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
index 450e555cfb0..4da7a17bbae 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
@@ -84,7 +84,7 @@ public class RuntimeSortedIndexTest extends
GridCommonAbstractTest {
Object[] lower = generateFindRow(rowIdLow,
testIdx.getKey(), notUnique, testIdx.getValue());
Object[] upper = generateFindRow(rowIdUp,
testIdx.getKey(), notUnique, testIdx.getValue());
- GridCursor<Object[]> cur = idx0.find(lower, upper, true,
true, null);
+ GridCursor<Object[]> cur = idx0.find(lower, upper, true,
true);
int rows = 0;
while (cur.next()) {