This is an automated email from the ASF dual-hosted git repository.
vgalaxies pushed a commit to branch pd-store
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/pd-store by this push:
new d10a68cfb chore: sync adapted codes to internal 4.0 version
d10a68cfb is described below
commit d10a68cfba32b3a4148f53e8f8109ea027e414b9
Author: VGalaxies <[email protected]>
AuthorDate: Sat Feb 3 21:51:28 2024 +0800
chore: sync adapted codes to internal 4.0 version
---
.../backend/cache/CachedSchemaTransactionV2.java | 17 +++---
.../org/apache/hugegraph/config/CoreOptions.java | 8 +++
.../hugegraph/meta/lock/PdDistributedLock.java | 4 +-
.../hugegraph/task/DistributedTaskScheduler.java | 61 ++++++++--------------
4 files changed, 43 insertions(+), 47 deletions(-)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
index b29a4227c..ef6bcc03b 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
@@ -293,13 +293,9 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
if (value == null) {
value = super.getSchema(type, name);
if (value != null) {
- this.resetCachedAllIfReachedCapacity();
-
- this.nameCache.update(prefixedName, value);
-
- SchemaElement schema = (SchemaElement) value;
- Id prefixedId = generateId(schema.type(), schema.id());
- this.idCache.update(prefixedId, schema);
+ // Note: reload all schema if the cache is inconsistent with
storage layer
+ this.clearCache(false);
+ this.loadAllSchema();
}
}
return (T) value;
@@ -338,6 +334,13 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
}
}
+ private void loadAllSchema() {
+ getAllSchema(HugeType.PROPERTY_KEY);
+ getAllSchema(HugeType.VERTEX_LABEL);
+ getAllSchema(HugeType.EDGE_LABEL);
+ getAllSchema(HugeType.INDEX_LABEL);
+ }
+
@Override
public void clear() {
// Clear schema info firstly
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index bb8745327..9f17b0cae 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -297,6 +297,14 @@ public class CoreOptions extends OptionHolder {
0
);
+ public static final ConfigOption<Long> TASK_SCHEDULE_PERIOD =
+ new ConfigOption<>(
+ "task.schedule_period",
+ "Period time when scheduler to schedule task",
+ rangeInt(0L, Long.MAX_VALUE),
+ 10L
+ );
+
public static final ConfigOption<Long> TASK_WAIT_TIMEOUT =
new ConfigOption<>(
"task.wait_timeout",
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
index 7da78af06..3e8c66ea1 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/lock/PdDistributedLock.java
@@ -44,7 +44,7 @@ public class PdDistributedLock {
public LockResult lock(String key, long second) {
long ttl = second * 1000L;
try {
- LockResponse response = this.client.lock(key, ttl);
+ LockResponse response = this.client.lockWithoutReentrant(key, ttl);
boolean succeed = response.getSucceed();
LockResult result = new LockResult();
if (succeed) {
@@ -56,7 +56,7 @@ public class PdDistributedLock {
synchronized (result) {
keepAlive(key);
}
- }, 10, period, TimeUnit.MILLISECONDS);
+ }, period, period, TimeUnit.MILLISECONDS);
result.setFuture(future);
}
return result;
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
index 09d6c3d50..c29d7a3b8 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
@@ -17,7 +17,6 @@
package org.apache.hugegraph.task;
-import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,16 +29,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.id.Id;
-import org.apache.hugegraph.backend.page.PageInfo;
import org.apache.hugegraph.backend.query.QueryResults;
-import org.apache.hugegraph.concurrent.LockGroup;
-import org.apache.hugegraph.concurrent.LockManager;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.exception.ConnectionException;
import org.apache.hugegraph.exception.NotFoundException;
@@ -53,7 +48,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
public class DistributedTaskScheduler extends TaskAndResultScheduler {
- protected static final int SCHEDULE_PERIOD = 10;
+ private final long schedulePeriod;
private static final Logger LOG =
Log.logger(DistributedTaskScheduler.class);
private final ExecutorService taskDbExecutor;
private final ExecutorService schemaTaskExecutor;
@@ -63,8 +58,6 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
private final ScheduledThreadPoolExecutor schedulerExecutor;
private final ScheduledFuture<?> cronFuture;
- private final String lockGroupName;
-
/**
* the status of scheduler
*/
@@ -90,11 +83,11 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
this.schedulerExecutor = schedulerExecutor;
- lockGroupName = String.format("%s_%s_distributed", graphSpace, graph);
- LockManager.instance().create(lockGroupName);
-
this.closed.set(false);
+ this.schedulePeriod = this.graph.configuration()
+ .get(CoreOptions.TASK_SCHEDULE_PERIOD);
+
this.cronFuture = this.schedulerExecutor.scheduleWithFixedDelay(
() -> {
// TODO: uncomment later - graph space
@@ -105,14 +98,15 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
// TaskManager.useAdmin();
this.cronSchedule();
} catch (Throwable t) {
- LOG.info("cronScheduler exception ", t);
+ // TODO: log with graph space
+ LOG.info("cronScheduler exception graph: {}",
this.graphName(), t);
} finally {
// TODO: uncomment later - graph space
LockUtil.unlock("", LockUtil.GRAPH_LOCK);
// LockUtil.unlock(this.graph().spaceGraphName(),
LockUtil.GRAPH_LOCK);
}
},
- 10L, SCHEDULE_PERIOD,
+ 10L, schedulePeriod,
TimeUnit.SECONDS);
}
@@ -158,9 +152,14 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
LOG.info("Try to update task({})@({}/{}) status" +
"(RUNNING->FAILED)", running.id(), this.graphSpace,
this.graphName);
- updateStatusWithLock(running.id(), TaskStatus.RUNNING,
- TaskStatus.FAILED);
- runningTasks.remove(running.id());
+ if (updateStatusWithLock(running.id(), TaskStatus.RUNNING,
+ TaskStatus.FAILED)) {
+ runningTasks.remove(running.id());
+ } else {
+ LOG.warn("Update task({})@({}/{}) status" +
+ "(RUNNING->FAILED) failed",
+ running.id(), this.graphSpace, this.graphName);
+ }
}
}
@@ -484,7 +483,7 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
return true;
} else {
- LOG.info("Update task({}) status conflict: current({}), " +
+ LOG.warn("Update task({}) status conflict: current({}), " +
"pre({}), status({})", id, task.status(),
prestatus, status);
return false;
@@ -542,7 +541,7 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
if (executor.getActiveCount() < executor.getMaximumPoolSize()) {
TaskRunner<?> runner = new TaskRunner<>(task);
chosenExecutor.submit(runner);
- LOG.info("Start task({})@({}/{})", task.id(),
+ LOG.info("Submit task({})@({}/{})", task.id(),
this.graphSpace, this.graphName);
return true;
@@ -570,23 +569,12 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
LockResult lockResult = new LockResult();
- LockGroup lockGroup = LockManager.instance().get(lockGroupName);
- Lock localLock = lockGroup.lock(taskId);
-
- if (!localLock.tryLock()) {
- return new LockResult();
- }
-
try {
lockResult =
MetaManager.instance().tryLockTask(graphSpace, graphName,
taskId);
} catch (Throwable t) {
- LOG.info(String.format("try to lock task(%s) error", taskId), t);
- }
-
- if (!lockResult.lockSuccess()) {
- localLock.unlock();
+ LOG.warn(String.format("try to lock task(%s) error", taskId), t);
}
return lockResult;
@@ -598,14 +586,9 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
MetaManager.instance().unlockTask(graphSpace, graphName, taskId,
lockResult);
} catch (Throwable t) {
- LOG.info(String.format("try to unlock task(%s) error",
+ LOG.warn(String.format("try to unlock task(%s) error",
taskId), t);
}
-
- LockGroup lockGroup = LockManager.instance().get(lockGroupName);
- Lock localLock = lockGroup.lock(taskId);
-
- localLock.unlock();
}
private boolean isLockedTask(String taskId) {
@@ -626,7 +609,9 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
LockResult lockResult = tryLockTask(task.id().asString());
initTaskParams(task);
- if (lockResult.lockSuccess()) {
+ if (lockResult.lockSuccess() && !task.completed()) {
+
+ LOG.info("Start task({})", task.id());
TaskManager.setContext(task.context());
try {
@@ -644,7 +629,7 @@ public class DistributedTaskScheduler extends
TaskAndResultScheduler {
// 任务执行不会抛出异常,HugeTask 在执行过程中,会捕获异常,并存储到 DB 中
task.run();
} catch (Throwable t) {
- LOG.info("exception when execute task", t);
+ LOG.warn("exception when execute task", t);
} finally {
runningTasks.remove(task.id());
unlockTask(task.id().asString(), lockResult);