This is an automated email from the ASF dual-hosted git repository.

vgalaxies pushed a commit to branch pd-store
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/pd-store by this push:
     new d10a68cfb chore: sync adapted codes to internal 4.0 version
d10a68cfb is described below

commit d10a68cfba32b3a4148f53e8f8109ea027e414b9
Author: VGalaxies <[email protected]>
AuthorDate: Sat Feb 3 21:51:28 2024 +0800

    chore: sync adapted codes to internal 4.0 version
---
 .../backend/cache/CachedSchemaTransactionV2.java   | 17 +++---
 .../org/apache/hugegraph/config/CoreOptions.java   |  8 +++
 .../hugegraph/meta/lock/PdDistributedLock.java     |  4 +-
 .../hugegraph/task/DistributedTaskScheduler.java   | 61 ++++++++--------------
 4 files changed, 43 insertions(+), 47 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
index b29a4227c..ef6bcc03b 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
@@ -293,13 +293,9 @@ public class CachedSchemaTransactionV2 extends 
SchemaTransactionV2 {
         if (value == null) {
             value = super.getSchema(type, name);
             if (value != null) {
-                this.resetCachedAllIfReachedCapacity();
-
-                this.nameCache.update(prefixedName, value);
-
-                SchemaElement schema = (SchemaElement) value;
-                Id prefixedId = generateId(schema.type(), schema.id());
-                this.idCache.update(prefixedId, schema);
+                // Note: reload all schema if the cache is inconsistent with 
storage layer
+                this.clearCache(false);
+                this.loadAllSchema();
             }
         }
         return (T) value;
@@ -338,6 +334,13 @@ public class CachedSchemaTransactionV2 extends 
SchemaTransactionV2 {
         }
     }
 
+    private void loadAllSchema() {
+        getAllSchema(HugeType.PROPERTY_KEY);
+        getAllSchema(HugeType.VERTEX_LABEL);
+        getAllSchema(HugeType.EDGE_LABEL);
+        getAllSchema(HugeType.INDEX_LABEL);
+    }
+
     @Override
     public void clear() {
         // Clear schema info firstly
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index bb8745327..9f17b0cae 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -297,6 +297,14 @@ public class CoreOptions extends OptionHolder {
                     0
             );
 
+    public static final ConfigOption<Long> TASK_SCHEDULE_PERIOD =
+        new ConfigOption<>(
+            "task.schedule_period",
+            "Period time when scheduler to schedule task",
+            rangeInt(0L, Long.MAX_VALUE),
+            10L
+        );
+
     public static final ConfigOption<Long> TASK_WAIT_TIMEOUT =
             new ConfigOption<>(
                     "task.wait_timeout",
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
index 7da78af06..3e8c66ea1 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
@@ -44,7 +44,7 @@ public class PdDistributedLock {
     public LockResult lock(String key, long second) {
         long ttl = second * 1000L;
         try {
-            LockResponse response = this.client.lock(key, ttl);
+            LockResponse response = this.client.lockWithoutReentrant(key, ttl);
             boolean succeed = response.getSucceed();
             LockResult result = new LockResult();
             if (succeed) {
@@ -56,7 +56,7 @@ public class PdDistributedLock {
                     synchronized (result) {
                         keepAlive(key);
                     }
-                }, 10, period, TimeUnit.MILLISECONDS);
+                }, period, period, TimeUnit.MILLISECONDS);
                 result.setFuture(future);
             }
             return result;
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
index 09d6c3d50..c29d7a3b8 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
@@ -17,7 +17,6 @@
 
 package org.apache.hugegraph.task;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,16 +29,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
 
 import org.apache.hugegraph.HugeException;
 import org.apache.hugegraph.HugeGraph;
 import org.apache.hugegraph.HugeGraphParams;
 import org.apache.hugegraph.backend.id.Id;
-import org.apache.hugegraph.backend.page.PageInfo;
 import org.apache.hugegraph.backend.query.QueryResults;
-import org.apache.hugegraph.concurrent.LockGroup;
-import org.apache.hugegraph.concurrent.LockManager;
 import org.apache.hugegraph.config.CoreOptions;
 import org.apache.hugegraph.exception.ConnectionException;
 import org.apache.hugegraph.exception.NotFoundException;
@@ -53,7 +48,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.slf4j.Logger;
 
 public class DistributedTaskScheduler extends TaskAndResultScheduler {
-    protected static final int SCHEDULE_PERIOD = 10;
+    private final long schedulePeriod;
     private static final Logger LOG = 
Log.logger(DistributedTaskScheduler.class);
     private final ExecutorService taskDbExecutor;
     private final ExecutorService schemaTaskExecutor;
@@ -63,8 +58,6 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
     private final ScheduledThreadPoolExecutor schedulerExecutor;
     private final ScheduledFuture<?> cronFuture;
 
-    private final String lockGroupName;
-
     /**
      * the status of scheduler
      */
@@ -90,11 +83,11 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
 
         this.schedulerExecutor = schedulerExecutor;
 
-        lockGroupName = String.format("%s_%s_distributed", graphSpace, graph);
-        LockManager.instance().create(lockGroupName);
-
         this.closed.set(false);
 
+        this.schedulePeriod = this.graph.configuration()
+                                        .get(CoreOptions.TASK_SCHEDULE_PERIOD);
+
         this.cronFuture = this.schedulerExecutor.scheduleWithFixedDelay(
             () -> {
                 // TODO: uncomment later - graph space
@@ -105,14 +98,15 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
                     // TaskManager.useAdmin();
                     this.cronSchedule();
                 } catch (Throwable t) {
-                    LOG.info("cronScheduler exception ", t);
+                    // TODO: log with graph space
+                    LOG.info("cronScheduler exception graph: {}", 
this.graphName(), t);
                 } finally {
                     // TODO: uncomment later - graph space
                     LockUtil.unlock("", LockUtil.GRAPH_LOCK);
                     // LockUtil.unlock(this.graph().spaceGraphName(), 
LockUtil.GRAPH_LOCK);
                 }
             },
-            10L, SCHEDULE_PERIOD,
+            10L, schedulePeriod,
             TimeUnit.SECONDS);
     }
 
@@ -158,9 +152,14 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
                 LOG.info("Try to update task({})@({}/{}) status" +
                          "(RUNNING->FAILED)", running.id(), this.graphSpace,
                          this.graphName);
-                updateStatusWithLock(running.id(), TaskStatus.RUNNING,
-                                     TaskStatus.FAILED);
-                runningTasks.remove(running.id());
+                if (updateStatusWithLock(running.id(), TaskStatus.RUNNING,
+                                         TaskStatus.FAILED)) {
+                    runningTasks.remove(running.id());
+                } else {
+                    LOG.warn("Update task({})@({}/{}) status" +
+                             "(RUNNING->FAILED) failed",
+                             running.id(), this.graphSpace, this.graphName);
+                }
             }
         }
 
@@ -484,7 +483,7 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
 
             return true;
         } else {
-            LOG.info("Update task({}) status conflict: current({}), " +
+            LOG.warn("Update task({}) status conflict: current({}), " +
                      "pre({}), status({})", id, task.status(),
                      prestatus, status);
             return false;
@@ -542,7 +541,7 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
         if (executor.getActiveCount() < executor.getMaximumPoolSize()) {
             TaskRunner<?> runner = new TaskRunner<>(task);
             chosenExecutor.submit(runner);
-            LOG.info("Start task({})@({}/{})", task.id(),
+            LOG.info("Submit task({})@({}/{})", task.id(),
                      this.graphSpace, this.graphName);
 
             return true;
@@ -570,23 +569,12 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
 
         LockResult lockResult = new LockResult();
 
-        LockGroup lockGroup = LockManager.instance().get(lockGroupName);
-        Lock localLock = lockGroup.lock(taskId);
-
-        if (!localLock.tryLock()) {
-            return new LockResult();
-        }
-
         try {
             lockResult =
                 MetaManager.instance().tryLockTask(graphSpace, graphName,
                                                    taskId);
         } catch (Throwable t) {
-            LOG.info(String.format("try to lock task(%s) error", taskId), t);
-        }
-
-        if (!lockResult.lockSuccess()) {
-            localLock.unlock();
+            LOG.warn(String.format("try to lock task(%s) error", taskId), t);
         }
 
         return lockResult;
@@ -598,14 +586,9 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
             MetaManager.instance().unlockTask(graphSpace, graphName, taskId,
                                               lockResult);
         } catch (Throwable t) {
-            LOG.info(String.format("try to unlock task(%s) error",
+            LOG.warn(String.format("try to unlock task(%s) error",
                                    taskId), t);
         }
-
-        LockGroup lockGroup = LockManager.instance().get(lockGroupName);
-        Lock localLock = lockGroup.lock(taskId);
-
-        localLock.unlock();
     }
 
     private boolean isLockedTask(String taskId) {
@@ -626,7 +609,9 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
             LockResult lockResult = tryLockTask(task.id().asString());
 
             initTaskParams(task);
-            if (lockResult.lockSuccess()) {
+            if (lockResult.lockSuccess() && !task.completed()) {
+
+                LOG.info("Start task({})", task.id());
 
                 TaskManager.setContext(task.context());
                 try {
@@ -644,7 +629,7 @@ public class DistributedTaskScheduler extends 
TaskAndResultScheduler {
                     // 任务执行不会抛出异常,HugeTask 在执行过程中,会捕获异常,并存储到 DB 中
                     task.run();
                 } catch (Throwable t) {
-                    LOG.info("exception when execute task", t);
+                    LOG.warn("exception when execute task", t);
                 } finally {
                     runningTasks.remove(task.id());
                     unlockTask(task.id().asString(), lockResult);

Reply via email to