This is an automated email from the ASF dual-hosted git repository. vaughn pushed a commit to branch zy_dev in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 9fc12c1f0abcfbe487a12812ca368bfc2b0aad53 Author: vaughn <[email protected]> AuthorDate: Sun Apr 23 15:56:59 2023 +0800 chore: single task and batch consume remove left index task --- .../backend/tx/GraphIndexTransaction.java | 158 ++++++++++++++++++--- 1 file changed, 138 insertions(+), 20 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java index f4ff8b32e..0b832e78c 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java @@ -27,10 +27,17 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import groovy.lang.Tuple2; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hugegraph.backend.page.PageIds; import org.apache.hugegraph.backend.page.PageState; @@ -95,6 +102,8 @@ import org.apache.hugegraph.util.NumericUtil; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import javax.security.auth.callback.Callback; + public class GraphIndexTransaction extends AbstractTransaction { public static final String START_SYMBOL = "("; @@ -1717,22 +1726,79 @@ public class GraphIndexTransaction extends AbstractTransaction { } } + public static class RemoveLeftIndexJobHelper { + + public static final int CAPACITY = 2000; + + private final BlockingQueue<Tuple2<ConditionQuery, HugeElement>> pendingQueue = + new ArrayBlockingQueue<>(CAPACITY); + + private AtomicReference<State> state; + + private HugeGraph graph; + + enum State { + INIT, + EXECUTE, + } + + private RemoveLeftIndexJobHelper(HugeGraph graph) { + this.state = new AtomicReference<>(State.INIT); + } + + public void add(ConditionQuery query, HugeElement element) { + if (query == null || element == null) { + return; + } + + if (!pendingQueue.offer(new Tuple2(query, element))) { + LOG.warn("The pending queue of RemoveLeftIndexJob is full"); + return; + } + + this.reSchedule(); + } + + public void consumeComplete() { + this.state.compareAndSet(State.EXECUTE, State.INIT); + } + + public void reSchedule() { + if (this.state.compareAndSet(State.INIT, State.EXECUTE)) { + try { + RemoveLeftIndexJob job = new RemoveLeftIndexJob(pendingQueue, this::consumeComplete, this::reSchedule); + EphemeralJobBuilder.of(this.graph) + .name("batch-remove-left-index") + .job(job) + .schedule(); + } catch (Throwable e) { + LOG.warn("Failed to schedule RemoveLeftIndexJob", e); + this.state.compareAndSet(State.EXECUTE, State.INIT); + } + } + } + } + public static class RemoveLeftIndexJob extends EphemeralJob<Object> { private static final String REMOVE_LEFT_INDEX = "remove_left_index"; + public static final int MAX_CONSUME_COUNT = 1000; - private final ConditionQuery query; - private final HugeElement element; private GraphIndexTransaction tx; - private Set<ConditionQuery.LeftIndex> leftIndexes; - private RemoveLeftIndexJob(ConditionQuery query, HugeElement element) { - E.checkArgumentNotNull(query, "query"); - E.checkArgumentNotNull(element, "element"); - this.query = query; - this.element = element; - this.tx = null; - this.leftIndexes = query.getLeftIndexOfElement(element.id()); + private Queue<Tuple2<ConditionQuery, HugeElement>> queue; + private Runnable completeCallback; + private Runnable scheduler; + + private RemoveLeftIndexJob(Queue<Tuple2<ConditionQuery, HugeElement>> queue, + Runnable completeCallback, + Runnable scheduler) { + E.checkArgumentNotNull(queue, "The queue can't be null"); + E.checkArgumentNotNull(completeCallback, "The callback can't be null"); + E.checkArgumentNotNull(scheduler, "The scheduler can't be null"); + this.queue = queue; + this.completeCallback = completeCallback; + this.scheduler = scheduler; } @Override @@ -1742,10 +1808,61 @@ public class GraphIndexTransaction extends AbstractTransaction { @Override public Object execute() { - this.tx = this.element.schemaLabel().system() ? - this.params().systemTransaction().indexTransaction() : - this.params().graphTransaction().indexTransaction(); - return this.removeIndexLeft(this.query, this.element); + boolean stop = false; + List<Tuple2<ConditionQuery, HugeElement>> systemElements = new ArrayList<>(); + List<Tuple2<ConditionQuery, HugeElement>> graphElements = new ArrayList<>(); + final int pageSize = 100; + int count = 0; + int consumeCount = 0; + while (!stop) { + + while (!this.queue.isEmpty() && (systemElements.size() + graphElements.size()) < pageSize) { + Tuple2<ConditionQuery, HugeElement> query2Element = this.queue.poll(); + if (query2Element.getSecond().schemaLabel().system()) { + systemElements.add(query2Element); + } else { + graphElements.add(query2Element); + } + + consumeCount ++; + } + + if (!systemElements.isEmpty()) { + this.tx = this.params().systemTransaction().indexTransaction(); + for (Tuple2<ConditionQuery, HugeElement> query2Element : systemElements) { + try { + count += this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond()); + } catch (Throwable e) { + LOG.warn("Failed to remove left index for system element {}", query2Element.getSecond().id(), e); + } + } + this.tx.commit(); + systemElements.clear(); + } + + if (!graphElements.isEmpty()) { + this.tx = this.params().graphTransaction().indexTransaction(); + for (Tuple2<ConditionQuery, HugeElement> query2Element : graphElements) { + try { + count += this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond()); + } catch (Throwable e) { + LOG.warn("Failed to remove left index for graph element {}", query2Element.getSecond().id(), e); + } + } + this.tx.commit(); + graphElements.clear(); + } + + if (this.queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT) { + this.completeCallback.run(); + stop = true; + if (!this.queue.isEmpty()) { + this.scheduler.run(); + } + } + } + + return count; } protected long removeIndexLeft(ConditionQuery query, @@ -1776,24 +1893,25 @@ public class GraphIndexTransaction extends AbstractTransaction { long sCount = 0; for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) { // Process range index - rCount += this.processRangeIndexLeft(cq, element); + rCount += this.processRangeIndexLeft(query, cq, element); // Process secondary index or search index sCount += this.processSecondaryOrSearchIndexLeft(cq, element); } - this.tx.commit(); return rCount + sCount; } private long processRangeIndexLeft(ConditionQuery query, + ConditionQuery flattenQuery, HugeElement element) { long count = 0; - if (this.leftIndexes == null) { + Set<ConditionQuery.LeftIndex> leftIndexes = query.getLeftIndexOfElement(element.id()); + if (CollectionUtils.isEmpty(leftIndexes)) { return count; } - for (ConditionQuery.LeftIndex leftIndex : this.leftIndexes) { + for (ConditionQuery.LeftIndex leftIndex : leftIndexes) { Set<Object> indexValues = leftIndex.indexFieldValues(); - IndexLabel indexLabel = this.findMatchedIndexLabel(query, + IndexLabel indexLabel = this.findMatchedIndexLabel(flattenQuery, leftIndex); assert indexLabel != null; @@ -1807,7 +1925,7 @@ public class GraphIndexTransaction extends AbstractTransaction { } } // Remove LeftIndex after constructing remove job - this.query.removeElementLeftIndex(element.id()); + query.removeElementLeftIndex(element.id()); this.tx.commit(); return count; }
