This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new e1d960741 refact(core): optimized batch removal of remaining indices
consumed by a single consumer (#2203)
e1d960741 is described below
commit e1d960741074208c0c4051b68b391436928cc8a7
Author: vaughn <[email protected]>
AuthorDate: Wed May 31 12:36:52 2023 +0800
refact(core): optimized batch removal of remaining indices consumed by a
single consumer (#2203)
---
.../java/org/apache/hugegraph/HugeGraphParams.java | 3 +
.../org/apache/hugegraph/StandardHugeGraph.java | 8 +
.../backend/store/raft/StoreSnapshotFile.java | 4 +-
.../backend/store/raft/rpc/RpcForwarder.java | 9 +-
.../backend/tx/GraphIndexTransaction.java | 35 +--
.../apache/hugegraph/task/EphemeralJobQueue.java | 241 +++++++++++++++++++++
.../org/apache/hugegraph/util/CompressUtil.java | 4 +-
.../backend/store/rocksdb/OpenedRocksDB.java | 2 +-
.../backend/store/rocksdb/RocksDBStdSessions.java | 4 +-
.../backend/store/rocksdb/RocksDBStore.java | 8 +-
.../store/rocksdbsst/RocksDBSstSessions.java | 2 +-
11 files changed, 292 insertions(+), 28 deletions(-)
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
index 0b45f7fbc..e655b7c02 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
@@ -23,6 +23,7 @@ import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
+import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.task.ServerInfoManager;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.type.define.GraphReadMode;
@@ -90,4 +91,6 @@ public interface HugeGraphParams {
RateLimiter readRateLimiter();
RamTable ramtable();
+
+ <T> void submitEphemeralJob(EphemeralJob<T> job);
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index c1d0108f7..42a67158e 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -51,6 +51,7 @@ import
org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.ram.RamTable;
+import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.CoreOptions;
@@ -60,6 +61,7 @@ import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.exception.NotAllowException;
import org.apache.hugegraph.io.HugeGraphIoRegistry;
+import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.masterelection.ClusterRoleStore;
import org.apache.hugegraph.masterelection.Config;
import org.apache.hugegraph.masterelection.RoleElectionConfig;
@@ -1163,6 +1165,7 @@ public class StandardHugeGraph implements HugeGraph {
private class StandardHugeGraphParams implements HugeGraphParams {
private HugeGraph graph = StandardHugeGraph.this;
+ private final EphemeralJobQueue ephemeralJobQueue = new
EphemeralJobQueue(this);
private void graph(HugeGraph graph) {
this.graph = graph;
@@ -1304,6 +1307,11 @@ public class StandardHugeGraph implements HugeGraph {
public RamTable ramtable() {
return StandardHugeGraph.this.ramtable;
}
+
+ @Override
+ public <T> void submitEphemeralJob(EphemeralJob<T> job) {
+ this.ephemeralJobQueue.add(job);
+ }
}
private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
index 6cf08f1e5..7c8d60458 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
@@ -164,7 +164,7 @@ public class StoreSnapshotFile {
try {
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir,
outputFile);
long begin = System.currentTimeMillis();
- String rootDir = Paths.get(snapshotDir).getParent().toString();
+ String rootDir =
Paths.get(snapshotDir).toAbsolutePath().getParent().toString();
String sourceDir =
Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir,
outputFile, checksum);
@@ -200,7 +200,7 @@ public class StoreSnapshotFile {
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist",
diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
- String parentPath = Paths.get(dataPath).getParent().toString();
+ String parentPath =
Paths.get(dataPath).toAbsolutePath().getParent().toString();
String snapshotDir = Paths.get(parentPath,
StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
index 389a2ecf4..b6809e6df 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
@@ -77,7 +77,14 @@ public class RpcForwarder {
public void setResponse(StoreCommandResponse response) {
if (response.getStatus()) {
LOG.debug("StoreCommandResponse status ok");
- future.complete(Status.OK(), () -> null);
+ // This code forwards the request to the Raft leader and
considers the operation successful
+ // if it's forwarded successfully. It returns a
RaftClosure because the calling
+ // logic expects a RaftClosure result. Specifically, if
the current instance is the Raft leader,
+ // it executes the corresponding logic locally and
notifies the calling logic asynchronously
+ // via RaftClosure. Therefore, the result is returned as a
RaftClosure here.
+ RaftClosure<Status> supplierFuture = new RaftClosure<>();
+ supplierFuture.complete(Status.OK());
+ future.complete(Status.OK(), () -> supplierFuture);
} else {
LOG.debug("StoreCommandResponse status error");
Status status = new Status(RaftError.UNKNOWN,
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
index f4ff8b32e..bc0bc0be1 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
@@ -36,6 +36,7 @@ import org.apache.hugegraph.backend.page.PageIds;
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
@@ -69,7 +70,6 @@ import org.apache.hugegraph.exception.NotAllowException;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.iterator.Metadatable;
import org.apache.hugegraph.job.EphemeralJob;
-import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.job.system.DeleteExpiredJob;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.IndexLabel;
@@ -81,7 +81,6 @@ import org.apache.hugegraph.structure.HugeIndex;
import org.apache.hugegraph.structure.HugeIndex.IdWithExpiredTime;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
-import org.apache.hugegraph.task.HugeTask;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.HugeKeys;
@@ -115,15 +114,11 @@ public class GraphIndexTransaction extends
AbstractTransaction {
conf.get(CoreOptions.QUERY_INDEX_INTERSECT_THRESHOLD);
}
- protected Id asyncRemoveIndexLeft(ConditionQuery query,
- HugeElement element) {
+ protected void asyncRemoveIndexLeft(ConditionQuery query,
+ HugeElement element) {
LOG.info("Remove left index: {}, query: {}", element, query);
RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element);
- HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
- .name(element.id().asString())
- .job(job)
- .schedule();
- return task.id();
+ this.params().submitEphemeralJob(job);
}
@Watched(prefix = "index")
@@ -1717,7 +1712,8 @@ public class GraphIndexTransaction extends
AbstractTransaction {
}
}
- public static class RemoveLeftIndexJob extends EphemeralJob<Object> {
+ public static class RemoveLeftIndexJob extends EphemeralJob<Long>
+ implements
EphemeralJobQueue.Reduce<Long> {
private static final String REMOVE_LEFT_INDEX = "remove_left_index";
@@ -1741,7 +1737,7 @@ public class GraphIndexTransaction extends
AbstractTransaction {
}
@Override
- public Object execute() {
+ public Long execute() {
this.tx = this.element.schemaLabel().system() ?
this.params().systemTransaction().indexTransaction() :
this.params().graphTransaction().indexTransaction();
@@ -1780,7 +1776,6 @@ public class GraphIndexTransaction extends
AbstractTransaction {
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
- this.tx.commit();
return rCount + sCount;
}
@@ -1808,7 +1803,6 @@ public class GraphIndexTransaction extends
AbstractTransaction {
}
// Remove LeftIndex after constructing remove job
this.query.removeElementLeftIndex(element.id());
- this.tx.commit();
return count;
}
@@ -1873,11 +1867,9 @@ public class GraphIndexTransaction extends
AbstractTransaction {
*/
this.tx.updateIndex(il.id(), element, false);
}
- this.tx.commit();
if (this.deletedByError(element, incorrectIndexFields,
incorrectPKs)) {
this.tx.updateIndex(il.id(), deletion, false);
- this.tx.commit();
} else {
count++;
}
@@ -1949,5 +1941,18 @@ public class GraphIndexTransaction extends
AbstractTransaction {
return (HugeEdge) QueryResults.one(iter);
}
}
+
+ @Override
+ public Long reduce(Long t1, Long t2) {
+ if (t1 == null) {
+ return t2;
+ }
+
+ if (t2 == null) {
+ return t1;
+ }
+
+ return t1 + t2;
+ }
}
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
new file mode 100644
index 000000000..70f49073d
--- /dev/null
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.job.EphemeralJob;
+import org.apache.hugegraph.job.EphemeralJobBuilder;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
+
+public class EphemeralJobQueue {
+
+ private static final Logger LOG = Log.logger(EphemeralJobQueue.class);
+
+ private static final long CAPACITY = 100 * Query.COMMIT_BATCH;
+
+ private final BlockingQueue<EphemeralJob<?>> pendingQueue;
+
+ private final AtomicReference<State> state;
+
+ private final HugeGraphParams graph;
+
+ private enum State {
+ INIT,
+ EXECUTE,
+ }
+
+ public EphemeralJobQueue(HugeGraphParams graph) {
+ this.state = new AtomicReference<>(State.INIT);
+ this.graph = graph;
+ this.pendingQueue = new ArrayBlockingQueue<>((int) CAPACITY);
+ }
+
+ public boolean add(EphemeralJob<?> job) {
+ if (job == null) {
+ return false;
+ }
+
+ if (!this.pendingQueue.offer(job)) {
+ LOG.warn("The pending queue of EphemeralJobQueue is full, {} job "
+
+ "will be ignored", job.type());
+ return false;
+ }
+
+ this.reScheduleIfNeeded();
+ return true;
+ }
+
+ protected HugeGraphParams params() {
+ return this.graph;
+ }
+
+ protected void clear() {
+ this.pendingQueue.clear();
+ }
+
+ protected EphemeralJob<?> poll() {
+ return this.pendingQueue.poll();
+ }
+
+ public void consumeComplete() {
+ this.state.compareAndSet(State.EXECUTE, State.INIT);
+ }
+
+ public void reScheduleIfNeeded() {
+ if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
+ try {
+ BatchEphemeralJob job = new BatchEphemeralJob(this);
+ EphemeralJobBuilder.of(this.graph.graph())
+ .name("batch-ephemeral-job")
+ .job(job)
+ .schedule();
+ } catch (Throwable e) {
+ // Maybe if it fails, consider clearing all the data in the
pendingQueue,
+ // or start a scheduled retry task to retry until success.
+ LOG.warn("Failed to schedule BatchEphemeralJob", e);
+ this.pendingQueue.clear();
+ this.state.compareAndSet(State.EXECUTE, State.INIT);
+ }
+ }
+ }
+
+ public boolean isEmpty() {
+ return this.pendingQueue.isEmpty();
+ }
+
+ public static class BatchEphemeralJob extends EphemeralJob<Object> {
+
+ private static final long PAGE_SIZE = Query.COMMIT_BATCH;
+ private static final String BATCH_EPHEMERAL_JOB =
"batch-ephemeral-job";
+ private static final long MAX_CONSUME_COUNT = 2 * PAGE_SIZE;
+
+ private WeakReference<EphemeralJobQueue> queueWeakReference;
+
+ public BatchEphemeralJob(EphemeralJobQueue queue) {
+ this.queueWeakReference = new WeakReference<>(queue);
+ }
+
+ @Override
+ public String type() {
+ return BATCH_EPHEMERAL_JOB;
+ }
+
+ @Override
+ public Object execute() throws Exception {
+ boolean stop = false;
+ Object result = null;
+ int consumeCount = 0;
+ InterruptedException interruptedException = null;
+ EphemeralJobQueue queue;
+ List<EphemeralJob<?>> batchJobs = new ArrayList<>();
+ while (!stop) {
+ if (interruptedException == null &&
Thread.currentThread().isInterrupted()) {
+ interruptedException = new InterruptedException();
+ }
+
+ queue = this.queueWeakReference.get();
+ if (queue == null) {
+ stop = true;
+ continue;
+ }
+
+ if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
+ interruptedException != null) {
+ queue.consumeComplete();
+ stop = true;
+ if (!queue.isEmpty()) {
+ queue.reScheduleIfNeeded();
+ }
+ continue;
+ }
+
+ try {
+ while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) {
+ EphemeralJob<?> job = queue.poll();
+ if (job == null) {
+ continue;
+ }
+ batchJobs.add(job);
+ }
+
+ if (batchJobs.isEmpty()) {
+ continue;
+ }
+
+ consumeCount += batchJobs.size();
+ result = this.executeBatchJob(batchJobs, result);
+
+ } catch (InterruptedException e) {
+ interruptedException = e;
+ } finally {
+ batchJobs.clear();
+ }
+ }
+
+ if (interruptedException != null) {
+ Thread.currentThread().interrupt();
+ throw interruptedException;
+ }
+
+ return result;
+ }
+
+ private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object
prevResult) throws Exception {
+ GraphTransaction graphTx = this.params().systemTransaction();
+ GraphTransaction systemTx = this.params().graphTransaction();
+ Object result = prevResult;
+ for (EphemeralJob<?> job : jobs) {
+ this.initJob(job);
+ Object obj = job.call();
+ if (job instanceof Reduce) {
+ result = ((Reduce) job).reduce(result, obj);
+ }
+ }
+
+ graphTx.commit();
+ systemTx.commit();
+
+ return result;
+ }
+
+ private void initJob(EphemeralJob<?> job) {
+ job.graph(this.graph());
+ job.params(this.params());
+ }
+
+ @Override
+ public Object call() throws Exception {
+ try {
+ return super.call();
+ } catch (Throwable e) {
+ LOG.warn("Failed to execute BatchEphemeralJob", e);
+ EphemeralJobQueue queue = this.queueWeakReference.get();
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ if (queue != null) {
+ queue.clear();
+ queue.consumeComplete();
+ }
+ throw e;
+ }
+
+ if (queue != null) {
+ queue.consumeComplete();
+ if (!queue.isEmpty()) {
+ queue.reScheduleIfNeeded();
+ }
+ }
+ throw e;
+ }
+ }
+ }
+
+ public interface Reduce<T> {
+ T reduce(T t1, T t2);
+ }
+}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
index 2a01ba6a0..0f5c179f4 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
@@ -145,7 +145,7 @@ public final class CompressUtil {
Files.createDirectories(newPath);
} else {
// check parent folder again
- Path parent = newPath.getParent();
+ Path parent = newPath.toAbsolutePath().getParent();
if (parent != null) {
if (Files.notExists(parent)) {
Files.createDirectories(parent);
@@ -176,7 +176,7 @@ public final class CompressUtil {
public static void compressZip(String inputDir, String outputFile,
Checksum checksum) throws IOException {
- String rootDir = Paths.get(inputDir).getParent().toString();
+ String rootDir =
Paths.get(inputDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(inputDir).getFileName().toString();
compressZip(rootDir, sourceDir, outputFile, checksum);
}
diff --git
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
index 3ae6ba3fe..91e02878a 100644
---
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
+++
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
@@ -99,7 +99,7 @@ public class OpenedRocksDB implements AutoCloseable {
}
public void createCheckpoint(String targetPath) {
- Path parentName = Paths.get(targetPath).getParent().getFileName();
+ Path parentName =
Paths.get(targetPath).toAbsolutePath().getParent().getFileName();
assert parentName.toString().startsWith("snapshot") : targetPath;
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
diff --git
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
index 71a66906d..bcbe37b7c 100644
---
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
+++
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
@@ -295,9 +295,9 @@ public class RocksDBStdSessions extends RocksDBSessions {
public String buildSnapshotPath(String snapshotPrefix) {
// Like: parent_path/rocksdb-data/*, * can be g,m,s
Path originDataPath = Paths.get(this.dataPath);
- Path parentParentPath = originDataPath.getParent().getParent();
+ Path parentParentPath =
originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
- Path pureDataPath = parentParentPath.relativize(originDataPath);
+ Path pureDataPath =
parentParentPath.relativize(originDataPath.toAbsolutePath());
// Like: parent_path/snapshot_rocksdb-data/*
Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
pureDataPath);
diff --git
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
index 4158c7d83..2dba5fa76 100644
---
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
+++
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
@@ -729,7 +729,7 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
for (Map.Entry<String, RocksDBSessions> entry :
this.dbs.entrySet()) {
// Like: parent_path/rocksdb-data/*, * maybe g,m,s
Path originDataPath =
Paths.get(entry.getKey()).toAbsolutePath();
- Path parentParentPath = originDataPath.getParent().getParent();
+ Path parentParentPath =
originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
Path pureDataPath =
parentParentPath.relativize(originDataPath);
// Like: parent_path/snapshot_rocksdb-data/*
@@ -740,7 +740,7 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
RocksDBSessions sessions = entry.getValue();
sessions.createSnapshot(snapshotPath.toString());
- String snapshotDir = snapshotPath.getParent().toString();
+ String snapshotDir =
snapshotPath.toAbsolutePath().getParent().toString();
// Find correspond data HugeType key
String diskTableKey = this.findDiskTableKeyByPath(
entry.getKey());
@@ -781,7 +781,7 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
if (deleteSnapshot) {
// Delete empty snapshot parent directory
- Path parentPath = Paths.get(snapshotPath).getParent();
+ Path parentPath =
Paths.get(snapshotPath).toAbsolutePath().getParent();
if (Files.list(parentPath).count() == 0) {
FileUtils.deleteDirectory(parentPath.toFile());
}
@@ -866,7 +866,7 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
diskMapping.put(TABLE_GENERAL_KEY, this.dataPath);
for (Map.Entry<HugeType, String> e : this.tableDiskMapping.entrySet())
{
String key = this.store + "/" + e.getKey().name();
- String value = Paths.get(e.getValue()).getParent().toString();
+ String value =
Paths.get(e.getValue()).toAbsolutePath().getParent().toString();
diskMapping.put(key, value);
}
return diskMapping;
diff --git
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
index 393cb2ef1..3d2b7f867 100644
---
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
+++
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
@@ -108,7 +108,7 @@ public class RocksDBSstSessions extends RocksDBSessions {
Path sstFile = Paths.get(this.dataPath, table,
number + RocksDBIngester.SST);
try {
- FileUtils.forceMkdir(sstFile.getParent().toFile());
+
FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile());
} catch (IOException e) {
throw new BackendException("Can't make directory for sst: '%s'",
e, sstFile.toString());