This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 9fc12c1f0abcfbe487a12812ca368bfc2b0aad53
Author: vaughn <[email protected]>
AuthorDate: Sun Apr 23 15:56:59 2023 +0800

    chore: single task and batch consume remove left index task
---
 .../backend/tx/GraphIndexTransaction.java          | 158 ++++++++++++++++++---
 1 file changed, 138 insertions(+), 20 deletions(-)

diff --git 
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
 
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
index f4ff8b32e..0b832e78c 100644
--- 
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
+++ 
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
@@ -27,10 +27,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import groovy.lang.Tuple2;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hugegraph.backend.page.PageIds;
 import org.apache.hugegraph.backend.page.PageState;
@@ -95,6 +102,8 @@ import org.apache.hugegraph.util.NumericUtil;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import javax.security.auth.callback.Callback;
+
 public class GraphIndexTransaction extends AbstractTransaction {
 
     public static final String START_SYMBOL = "(";
@@ -1717,22 +1726,79 @@ public class GraphIndexTransaction extends 
AbstractTransaction {
         }
     }
 
+    public static class RemoveLeftIndexJobHelper {
+
+        public static final int CAPACITY = 2000;
+
+        private final BlockingQueue<Tuple2<ConditionQuery, HugeElement>> 
pendingQueue =
+                             new ArrayBlockingQueue<>(CAPACITY);
+
+        private AtomicReference<State> state;
+
+        private HugeGraph graph;
+
+        enum State {
+            INIT,
+            EXECUTE,
+        }
+
+        private RemoveLeftIndexJobHelper(HugeGraph graph) {
+            this.state = new AtomicReference<>(State.INIT);
+        }
+
+        public void add(ConditionQuery query, HugeElement element) {
+            if (query == null || element == null) {
+                return;
+            }
+
+            if (!pendingQueue.offer(new Tuple2(query, element))) {
+                LOG.warn("The pending queue of RemoveLeftIndexJob is full");
+                return;
+            }
+
+           this.reSchedule();
+        }
+
+        public void consumeComplete() {
+            this.state.compareAndSet(State.EXECUTE, State.INIT);
+        }
+
+        public void reSchedule() {
+            if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
+                try {
+                    RemoveLeftIndexJob job = new 
RemoveLeftIndexJob(pendingQueue, this::consumeComplete, this::reSchedule);
+                    EphemeralJobBuilder.of(this.graph)
+                            .name("batch-remove-left-index")
+                            .job(job)
+                            .schedule();
+                } catch (Throwable e) {
+                    LOG.warn("Failed to schedule RemoveLeftIndexJob", e);
+                    this.state.compareAndSet(State.EXECUTE, State.INIT);
+                }
+            }
+        }
+    }
+
     public static class RemoveLeftIndexJob extends EphemeralJob<Object> {
 
         private static final String REMOVE_LEFT_INDEX = "remove_left_index";
+        public static final int MAX_CONSUME_COUNT = 1000;
 
-        private final ConditionQuery query;
-        private final HugeElement element;
         private GraphIndexTransaction tx;
-        private Set<ConditionQuery.LeftIndex> leftIndexes;
 
-        private RemoveLeftIndexJob(ConditionQuery query, HugeElement element) {
-            E.checkArgumentNotNull(query, "query");
-            E.checkArgumentNotNull(element, "element");
-            this.query = query;
-            this.element = element;
-            this.tx = null;
-            this.leftIndexes = query.getLeftIndexOfElement(element.id());
+        private Queue<Tuple2<ConditionQuery, HugeElement>> queue;
+        private Runnable completeCallback;
+        private Runnable scheduler;
+
+        private RemoveLeftIndexJob(Queue<Tuple2<ConditionQuery, HugeElement>> 
queue,
+                                   Runnable completeCallback,
+                                   Runnable scheduler) {
+            E.checkArgumentNotNull(queue, "The queue can't be null");
+            E.checkArgumentNotNull(completeCallback, "The callback can't be 
null");
+            E.checkArgumentNotNull(scheduler, "The scheduler can't be null");
+            this.queue = queue;
+            this.completeCallback = completeCallback;
+            this.scheduler = scheduler;
         }
 
         @Override
@@ -1742,10 +1808,61 @@ public class GraphIndexTransaction extends 
AbstractTransaction {
 
         @Override
         public Object execute() {
-            this.tx = this.element.schemaLabel().system() ?
-                      this.params().systemTransaction().indexTransaction() :
-                      this.params().graphTransaction().indexTransaction();
-            return this.removeIndexLeft(this.query, this.element);
+            boolean stop = false;
+            List<Tuple2<ConditionQuery, HugeElement>> systemElements = new 
ArrayList<>();
+            List<Tuple2<ConditionQuery, HugeElement>> graphElements = new 
ArrayList<>();
+            final int pageSize = 100;
+            int count = 0;
+            int consumeCount = 0;
+            while (!stop) {
+
+                while (!this.queue.isEmpty() && (systemElements.size() + 
graphElements.size()) < pageSize) {
+                    Tuple2<ConditionQuery, HugeElement> query2Element = 
this.queue.poll();
+                    if (query2Element.getSecond().schemaLabel().system()) {
+                        systemElements.add(query2Element);
+                    } else {
+                        graphElements.add(query2Element);
+                    }
+
+                    consumeCount ++;
+                }
+
+                if (!systemElements.isEmpty()) {
+                    this.tx = 
this.params().systemTransaction().indexTransaction();
+                    for (Tuple2<ConditionQuery, HugeElement> query2Element : 
systemElements) {
+                        try {
+                            count += 
this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond());
+                        } catch (Throwable e) {
+                            LOG.warn("Failed to remove left index for system 
element {}", query2Element.getSecond().id(), e);
+                        }
+                    }
+                    this.tx.commit();
+                    systemElements.clear();
+                }
+
+                if (!graphElements.isEmpty()) {
+                    this.tx = 
this.params().graphTransaction().indexTransaction();
+                    for (Tuple2<ConditionQuery, HugeElement> query2Element : 
graphElements) {
+                        try {
+                            count += 
this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond());
+                        } catch (Throwable e) {
+                            LOG.warn("Failed to remove left index for graph 
element {}", query2Element.getSecond().id(), e);
+                        }
+                    }
+                    this.tx.commit();
+                    graphElements.clear();
+                }
+
+                if (this.queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT) {
+                    this.completeCallback.run();
+                    stop = true;
+                    if (!this.queue.isEmpty()) {
+                        this.scheduler.run();
+                    }
+                }
+            }
+
+            return count;
         }
 
         protected long removeIndexLeft(ConditionQuery query,
@@ -1776,24 +1893,25 @@ public class GraphIndexTransaction extends 
AbstractTransaction {
             long sCount = 0;
             for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
                 // Process range index
-                rCount += this.processRangeIndexLeft(cq, element);
+                rCount += this.processRangeIndexLeft(query, cq, element);
                 // Process secondary index or search index
                 sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
             }
-            this.tx.commit();
             return rCount + sCount;
         }
 
         private long processRangeIndexLeft(ConditionQuery query,
+                                           ConditionQuery flattenQuery,
                                            HugeElement element) {
             long count = 0;
-            if (this.leftIndexes == null) {
+            Set<ConditionQuery.LeftIndex> leftIndexes = 
query.getLeftIndexOfElement(element.id());
+            if (CollectionUtils.isEmpty(leftIndexes)) {
                 return count;
             }
 
-            for (ConditionQuery.LeftIndex leftIndex : this.leftIndexes) {
+            for (ConditionQuery.LeftIndex leftIndex : leftIndexes) {
                 Set<Object> indexValues = leftIndex.indexFieldValues();
-                IndexLabel indexLabel = this.findMatchedIndexLabel(query,
+                IndexLabel indexLabel = 
this.findMatchedIndexLabel(flattenQuery,
                                                                    leftIndex);
                 assert indexLabel != null;
 
@@ -1807,7 +1925,7 @@ public class GraphIndexTransaction extends 
AbstractTransaction {
                 }
             }
             // Remove LeftIndex after constructing remove job
-            this.query.removeElementLeftIndex(element.id());
+            query.removeElementLeftIndex(element.id());
             this.tx.commit();
             return count;
         }

Reply via email to