This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new af59fc882a7 [IOTDB-6043] Pipe: a framework to support wal hardlink
mode (#10440) (#10694)
af59fc882a7 is described below
commit af59fc882a7f1dfe52c529250d1b47081fe57f45
Author: Itami Sho <[email protected]>
AuthorDate: Fri Jul 28 11:15:29 2023 +0800
[IOTDB-6043] Pipe: a framework to support wal hardlink mode (#10440)
(#10694)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit d4c643c377832f02391eff49a674cd6cc440c073)
---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 2 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +-
.../PipeHardlinkFileDirStartupCleaner.java | 10 +-
.../db/pipe/resource/PipeResourceManager.java | 31 +++-
.../PipeTsFileResourceManager.java} | 25 +--
.../db/pipe/resource/wal/PipeWALResource.java | 26 ++--
.../pipe/resource/wal/PipeWALResourceManager.java | 46 ++++--
.../wal/hardlink/PipeWALHardlinkResource.java | 47 ++++++
.../hardlink/PipeWALHardlinkResourceManager.java | 167 +++++++++++++++++++++
.../wal/selfhost/PipeWALSelfHostResource.java | 41 +++++
.../selfhost/PipeWALSelfHostResourceManager.java} | 32 ++--
.../dataregion/wal/checkpoint/MemTableInfo.java | 4 +
.../storageengine/dataregion/wal/node/WALNode.java | 5 +-
.../dataregion/wal/utils/WALEntryHandler.java | 37 ++++-
...est.java => PipeTsFileResourceManagerTest.java} | 61 ++++----
.../PipeWALHardlinkResourceManagerTest.java | 114 ++++++++++++++
.../resources/conf/iotdb-common.properties | 16 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 36 ++++-
.../iotdb/commons/conf/CommonDescriptor.java | 10 ++
.../iotdb/commons/pipe/config/PipeConfig.java | 22 +++
21 files changed, 620 insertions(+), 120 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index a8c966ad408..391bdc97ba2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import
org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
+import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index abb48185db3..7162cdabeca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -75,7 +75,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
try {
- PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(),
walEntryHandler);
+ PipeResourceManager.wal().pin(walEntryHandler);
return true;
} catch (Exception e) {
LOGGER.warn(
@@ -90,7 +90,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
try {
- PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
+ PipeResourceManager.wal().unpin(walEntryHandler);
return true;
} catch (Exception e) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 7f164ab44fb..bb260bf22ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -109,7 +109,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
@Override
public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
try {
- tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
+ tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile,
true);
return true;
} catch (Exception e) {
LOGGER.warn(
@@ -124,7 +124,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
@Override
public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
try {
- PipeResourceManager.file().decreaseFileReference(tsFile);
+ PipeResourceManager.tsfile().decreaseFileReference(tsFile);
return true;
} catch (Exception e) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeHardlinkFileDirStartupCleaner.java
similarity index 87%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeHardlinkFileDirStartupCleaner.java
index 1acadceb5fd..992d599703f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeHardlinkFileDirStartupCleaner.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource.file;
+package org.apache.iotdb.db.pipe.resource;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,14 +39,14 @@ public class PipeHardlinkFileDirStartupCleaner {
* PipeConfig.PIPE_TSFILE_DIR_NAME directory.
*/
public static void clean() {
- for (String dataDir :
IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
- for (File file :
+ for (final String dataDir :
IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
+ for (final File file :
FileUtils.listFilesAndDirs(
new File(dataDir), DirectoryFileFilter.INSTANCE,
DirectoryFileFilter.INSTANCE)) {
if (file.isDirectory()
- &&
file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
{
+ &&
file.getName().equals(PipeConfig.getInstance().getPipeHardlinkBaseDirName())) {
LOGGER.info(
- "pipe hardlink tsfile dir found, deleting it: {}, result: {}",
+ "pipe hardlink dir found, deleting it: {}, result: {}",
file,
FileUtils.deleteQuietly(file));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 43bddd872f6..519aa0b56ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -19,27 +19,42 @@
package org.apache.iotdb.db.pipe.resource;
-import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import
org.apache.iotdb.db.pipe.resource.wal.hardlink.PipeWALHardlinkResourceManager;
+import
org.apache.iotdb.db.pipe.resource.wal.selfhost.PipeWALSelfHostResourceManager;
+
+import java.util.concurrent.atomic.AtomicReference;
public class PipeResourceManager {
- private final PipeFileResourceManager pipeFileResourceManager;
- private final PipeWALResourceManager pipeWALResourceManager;
+ private final PipeTsFileResourceManager pipeTsFileResourceManager;
+ private final AtomicReference<PipeWALResourceManager> pipeWALResourceManager;
- public static PipeFileResourceManager file() {
- return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
+ public static PipeTsFileResourceManager tsfile() {
+ return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
}
public static PipeWALResourceManager wal() {
- return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
+ if (PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.get() ==
null) {
+ synchronized (PipeResourceManagerHolder.INSTANCE) {
+ if (PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.get() ==
null) {
+ PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.set(
+ PipeConfig.getInstance().getPipeHardLinkWALEnabled()
+ ? new PipeWALHardlinkResourceManager()
+ : new PipeWALSelfHostResourceManager());
+ }
+ }
+ }
+ return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.get();
}
///////////////////////////// SINGLETON /////////////////////////////
private PipeResourceManager() {
- pipeFileResourceManager = new PipeFileResourceManager();
- pipeWALResourceManager = new PipeWALResourceManager();
+ pipeTsFileResourceManager = new PipeTsFileResourceManager();
+ pipeWALResourceManager = new AtomicReference<>();
}
private static class PipeResourceManagerHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
similarity index 90%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 4b04225cf58..08f1e7db2e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -17,11 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource.file;
+package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.utils.FileUtils;
import java.io.File;
import java.io.IOException;
@@ -31,7 +30,7 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
-public class PipeFileResourceManager {
+public class PipeTsFileResourceManager {
private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new
HashMap<>();
@@ -98,11 +97,12 @@ public class PipeFileResourceManager {
private static String getPipeTsFileDirPath(File file) throws IOException {
while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
- && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
- &&
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
{
+ && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)) {
file = file.getParentFile();
}
return file.getParentFile().getCanonicalPath()
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ File.separator
+ PipeConfig.getInstance().getPipeHardlinkTsFileDirName();
}
@@ -110,8 +110,7 @@ public class PipeFileResourceManager {
private static String getRelativeFilePath(File file) {
StringBuilder builder = new StringBuilder(file.getName());
while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
- && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
- &&
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
{
+ && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)) {
file = file.getParentFile();
builder =
new StringBuilder(file.getName())
@@ -165,18 +164,6 @@ public class PipeFileResourceManager {
}
}
- /**
- * clear all hardlink or copied files under pipe dir of the given data dir.
- *
- * <p>this method can be only invoked when the system is booting up.
- */
- public synchronized void clear(String dataDir) {
- File pipeTsFileDir = new File(dataDir,
PipeConfig.getInstance().getPipeHardlinkTsFileDirName());
- if (pipeTsFileDir.exists()) {
- FileUtils.deleteDirectory(pipeTsFileDir);
- }
- }
-
/**
* get the reference count of the file.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 27b79ec42a5..085c71d5f9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -32,11 +32,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class PipeWALResource implements Closeable {
+public abstract class PipeWALResource implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeWALResource.class);
- private final WALEntryHandler walEntryHandler;
+ protected final WALEntryHandler walEntryHandler;
private final AtomicInteger referenceCount;
@@ -44,7 +44,7 @@ public class PipeWALResource implements Closeable {
private final AtomicLong lastLogicalPinTime;
private final AtomicBoolean isPhysicallyPinned;
- public PipeWALResource(WALEntryHandler walEntryHandler) {
+ protected PipeWALResource(WALEntryHandler walEntryHandler) {
this.walEntryHandler = walEntryHandler;
referenceCount = new AtomicInteger(0);
@@ -53,11 +53,11 @@ public class PipeWALResource implements Closeable {
isPhysicallyPinned = new AtomicBoolean(false);
}
- public void pin() throws PipeRuntimeNonCriticalException {
+ public final void pin() throws PipeRuntimeNonCriticalException {
if (referenceCount.get() == 0) {
if (!isPhysicallyPinned.get()) {
try {
- walEntryHandler.pinMemTable();
+ pinInternal();
} catch (MemTablePinException e) {
throw new PipeRuntimeNonCriticalException(
String.format(
@@ -75,7 +75,10 @@ public class PipeWALResource implements Closeable {
referenceCount.incrementAndGet();
}
- public void unpin() throws PipeRuntimeNonCriticalException {
+ protected abstract void pinInternal()
+ throws MemTablePinException, PipeRuntimeNonCriticalException;
+
+ public final void unpin() throws PipeRuntimeNonCriticalException {
final int finalReferenceCount = referenceCount.get();
if (finalReferenceCount == 1) {
@@ -90,12 +93,15 @@ public class PipeWALResource implements Closeable {
referenceCount.decrementAndGet();
}
+ protected abstract void unpinInternal()
+ throws MemTablePinException, PipeRuntimeNonCriticalException;
+
/**
* Invalidate the wal if it is unpinned and out of time to live.
*
* @return true if the wal is invalidated, false otherwise
*/
- public boolean invalidateIfPossible() {
+ public final boolean invalidateIfPossible() {
if (referenceCount.get() > 0) {
return false;
}
@@ -114,7 +120,7 @@ public class PipeWALResource implements Closeable {
if (isPhysicallyPinned.get()) {
if (System.currentTimeMillis() - lastLogicalPinTime.get() >
MIN_TIME_TO_LIVE_IN_MS) {
try {
- walEntryHandler.unpinMemTable();
+ unpinInternal();
} catch (MemTablePinException e) {
throw new PipeRuntimeNonCriticalException(
String.format(
@@ -138,10 +144,10 @@ public class PipeWALResource implements Closeable {
}
@Override
- public void close() {
+ public final void close() {
if (isPhysicallyPinned.get()) {
try {
- walEntryHandler.unpinMemTable();
+ unpinInternal();
} catch (MemTablePinException e) {
LOGGER.error(
"failed to unpin wal {} when closing pipe wal resource, because
{}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index e23f2e897d5..64cd83f1a67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -1,10 +1,30 @@
-/* * Licensed to the Apache Software Foundation (ASF) under one * or more
contributor license agreements. See the NOTICE file * distributed with this
work for additional information * regarding copyright ownership. The ASF
licenses this file * to you under the Apache License, Version 2.0 (the *
"License"); you may not use this file except in compliance * with the License.
You may obtain a copy of the License at * *
http://www.apache.org/licenses/LICENSE-2.0 * * Unless r [...]
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -12,9 +32,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-public class PipeWALResourceManager {
+public abstract class PipeWALResourceManager {
- private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
+ protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
private static final int SEGMENT_LOCK_COUNT = 32;
private final ReentrantLock[] memtableIdSegmentLocks;
@@ -23,7 +43,7 @@ public class PipeWALResourceManager {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
- public PipeWALResourceManager() {
+ protected PipeWALResourceManager() {
// memtableIdToPipeWALResourceMap can be concurrently accessed by multiple
threads
memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
@@ -57,27 +77,33 @@ public class PipeWALResourceManager {
TimeUnit.MILLISECONDS);
}
- public void pin(long memtableId, WALEntryHandler walEntryHandler) {
+ public final void pin(final WALEntryHandler walEntryHandler) throws
IOException {
+ final long memtableId = walEntryHandler.getMemTableId();
final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
- memtableIdToPipeWALResourceMap
- .computeIfAbsent(memtableId, id -> new
PipeWALResource(walEntryHandler))
- .pin();
+ pinInternal(memtableId, walEntryHandler);
} finally {
lock.unlock();
}
}
- public void unpin(long memtableId) {
+ protected abstract void pinInternal(long memtableId, WALEntryHandler
walEntryHandler)
+ throws IOException;
+
+ public final void unpin(final WALEntryHandler walEntryHandler) throws
IOException {
+ final long memtableId = walEntryHandler.getMemTableId();
final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
- memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+ unpinInternal(memtableId, walEntryHandler);
} finally {
lock.unlock();
}
}
+
+ protected abstract void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler)
+ throws IOException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResource.java
new file mode 100644
index 00000000000..f1ad513ccc4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResource.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal.hardlink;
+
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResource;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+
+public class PipeWALHardlinkResource extends PipeWALResource {
+
+ private final PipeWALHardlinkResourceManager resourceManager;
+
+ protected PipeWALHardlinkResource(
+ WALEntryHandler walEntryHandler, PipeWALHardlinkResourceManager
resourceManager) {
+ super(walEntryHandler);
+ this.resourceManager = resourceManager;
+ }
+
+ @Override
+ protected void pinInternal() throws MemTablePinException {
+ // TODO: hardlink
+ walEntryHandler.pinMemTable();
+ }
+
+ @Override
+ protected void unpinInternal() throws MemTablePinException {
+ // TODO: hardlink
+ walEntryHandler.unpinMemTable();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
new file mode 100644
index 00000000000..acd64cf23b4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal.hardlink;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeWALHardlinkResourceManager extends PipeWALResourceManager {
+
+ @Override
+ protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler)
{
+ memtableIdToPipeWALResourceMap
+ .computeIfAbsent(memtableId, id -> new
PipeWALHardlinkResource(walEntryHandler, this))
+ .pin();
+ }
+
+ @Override
+ protected void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler) {
+ memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+ }
+
+ //////////////////////////// hardlink related ////////////////////////////
+
+ private final Map<String, Integer> hardlinkToReferenceMap = new HashMap<>();
+
+ /**
+ * given a file, create a hardlink, maintain a reference count for the
hardlink, and return the
+ * hardlink.
+ *
+ * <p>if the given file is already a hardlink, increase its reference count
and return it.
+ *
+ * <p>if the given file is a wal, create a hardlink in pipe dir, increase
the reference count of
+ * the hardlink and return it.
+ *
+ * @param file wal file. can be original file or the hardlink of original
file
+ * @return the hardlink
+ * @throws IOException when create hardlink failed
+ */
+ public synchronized File increaseFileReference(File file) throws IOException
{
+ // if the file is already a hardlink, just increase reference count and
return it
+ if (increaseReferenceIfExists(file.getPath())) {
+ return file;
+ }
+
+ // if the file is not a hardlink, check if there is a related hardlink in
pipe dir. if so,
+ // increase reference count and return it.
+ final File hardlink = getHardlinkInPipeWALDir(file);
+ if (increaseReferenceIfExists(hardlink.getPath())) {
+ return hardlink;
+ }
+
+ // if the file is a wal, and there is no related hardlink in pipe dir,
create a hardlink to pipe
+ // dir, maintain a reference count for the hardlink, and return the
hardlink.
+ hardlinkToReferenceMap.put(hardlink.getPath(), 1);
+ return createHardlink(file, hardlink);
+ }
+
+ private boolean increaseReferenceIfExists(String path) {
+ hardlinkToReferenceMap.computeIfPresent(path, (k, v) -> v + 1);
+ return hardlinkToReferenceMap.containsKey(path);
+ }
+
+ // TODO: Check me! Make sure the file is not a hardlink.
+ // TODO: IF user specify a wal by config, will the method work?
+ private static File getHardlinkInPipeWALDir(File file) throws IOException {
+ try {
+ return new File(getPipeWALDirPath(file), getRelativeFilePath(file));
+ } catch (Exception e) {
+ throw new IOException(
+ String.format(
+ "failed to get hardlink in pipe dir " + "for file %s, it is not
a wal",
+ file.getPath()),
+ e);
+ }
+ }
+
+ private static String getPipeWALDirPath(File file) throws IOException {
+ while (!file.getName().equals(IoTDBConstant.WAL_FOLDER_NAME)) {
+ file = file.getParentFile();
+ }
+
+ return file.getParentFile().getCanonicalPath()
+ + File.separator
+ + IoTDBConstant.DATA_FOLDER_NAME
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkWALDirName();
+ }
+
+ private static String getRelativeFilePath(File file) {
+ StringBuilder builder = new StringBuilder(file.getName());
+ while
(!file.getParentFile().getName().equals(IoTDBConstant.WAL_FOLDER_NAME)) {
+ file = file.getParentFile();
+ builder =
+ new StringBuilder(file.getName())
+ .append(IoTDBConstant.FILE_NAME_SEPARATOR)
+ .append(builder);
+ }
+ return builder.toString();
+ }
+
+ private static File createHardlink(File sourceFile, File hardlink) throws
IOException {
+ if (!hardlink.getParentFile().exists() &&
!hardlink.getParentFile().mkdirs()) {
+ throw new IOException(
+ String.format(
+ "failed to create hardlink %s for file %s: failed to create
parent dir %s",
+ hardlink.getPath(), sourceFile.getPath(),
hardlink.getParentFile().getPath()));
+ }
+
+ final Path sourcePath =
FileSystems.getDefault().getPath(sourceFile.getAbsolutePath());
+ final Path linkPath =
FileSystems.getDefault().getPath(hardlink.getAbsolutePath());
+ Files.createLink(linkPath, sourcePath);
+ return hardlink;
+ }
+
+ /**
+ * given a hardlink, decrease its reference count, if the reference count is
0, delete the file.
+ * if the given file is not a hardlink, do nothing.
+ *
+ * @param hardlink the hardlinked file
+ * @throws IOException when delete file failed
+ */
+ public synchronized void decreaseFileReference(File hardlink) throws
IOException {
+ final Integer updatedReference =
+ hardlinkToReferenceMap.computeIfPresent(
+ hardlink.getPath(), (file, reference) -> reference - 1);
+
+ if (updatedReference != null && updatedReference == 0) {
+ Files.deleteIfExists(hardlink.toPath());
+ hardlinkToReferenceMap.remove(hardlink.getPath());
+ }
+ }
+
+ @TestOnly
+ public synchronized int getFileReferenceCount(File hardlink) {
+ return hardlinkToReferenceMap.getOrDefault(hardlink.getPath(), 0);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResource.java
new file mode 100644
index 00000000000..e8e03e64a16
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal.selfhost;
+
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResource;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+
+public class PipeWALSelfHostResource extends PipeWALResource {
+
+ public PipeWALSelfHostResource(WALEntryHandler walEntryHandler) {
+ super(walEntryHandler);
+ }
+
+ @Override
+ protected void pinInternal() throws MemTablePinException {
+ walEntryHandler.pinMemTable();
+ }
+
+ @Override
+ protected void unpinInternal() throws MemTablePinException {
+ walEntryHandler.unpinMemTable();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
index 43bddd872f6..94404eafbed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
@@ -17,32 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource;
+package org.apache.iotdb.db.pipe.resource.wal.selfhost;
-import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
-public class PipeResourceManager {
+public class PipeWALSelfHostResourceManager extends PipeWALResourceManager {
- private final PipeFileResourceManager pipeFileResourceManager;
- private final PipeWALResourceManager pipeWALResourceManager;
-
- public static PipeFileResourceManager file() {
- return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
- }
-
- public static PipeWALResourceManager wal() {
- return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
- }
-
- ///////////////////////////// SINGLETON /////////////////////////////
-
- private PipeResourceManager() {
- pipeFileResourceManager = new PipeFileResourceManager();
- pipeWALResourceManager = new PipeWALResourceManager();
+ @Override
+ protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler)
{
+ memtableIdToPipeWALResourceMap
+ .computeIfAbsent(memtableId, id -> new
PipeWALSelfHostResource(walEntryHandler))
+ .pin();
}
- private static class PipeResourceManagerHolder {
- private static final PipeResourceManager INSTANCE = new
PipeResourceManager();
+ @Override
+ protected void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler) {
+ memtableIdToPipeWALResourceMap.get(memtableId).unpin();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
index 322634980b5..75ab7d9f6b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
@@ -116,6 +116,10 @@ public class MemTableInfo implements SerializedSize {
return pinCount > 0;
}
+ public int getPinCount() {
+ return pinCount;
+ }
+
public boolean isFlushed() {
return flushed;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index e8c34f82967..8ba4b3149a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -372,11 +372,12 @@ public class WALNode implements IWALNode {
}
if (oldestMemTableInfo.isPinned()) {
logger.warn(
- "Pipe: Effective information ratio {} of wal node-{} is below wal
min effective info ratio {}. But fail to delete memTable-{}'s wal files because
they are pinned by the Pipe module.",
+ "Pipe: Effective information ratio {} of wal node-{} is below wal
min effective info ratio {}. But fail to delete memTable-{}'s wal files because
they are pinned by the Pipe module. Pin count: {}.",
effectiveInfoRatio,
identifier,
config.getWalMinEffectiveInfoRatio(),
- oldestMemTableInfo.getMemTableId());
+ oldestMemTableInfo.getMemTableId(),
+ oldestMemTableInfo.getPinCount());
return false;
}
IMemTable oldestMemTable = oldestMemTableInfo.getMemTable();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index 27b38844f33..abcf3643ce2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -28,11 +28,15 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* This handler is used by the Pipe to find the corresponding insert node.
Besides, it can try to
* pin/unpin the wal entries by the memTable id.
*/
public class WALEntryHandler {
+
private static final Logger logger =
LoggerFactory.getLogger(WALEntryHandler.class);
private long memTableId = -1;
@@ -46,6 +50,9 @@ public class WALEntryHandler {
// wal node, null when wal is disabled
private WALNode walNode = null;
+ private volatile boolean isHardlink = false;
+ private final AtomicReference<File> hardlinkFile = new AtomicReference<>();
+
public WALEntryHandler(WALEntryValue value) {
this.value = value;
}
@@ -90,6 +97,7 @@ public class WALEntryHandler {
throw new WALPipeException("Fail to get value because the entry type
isn't InsertNode.");
}
}
+
// wait until the position is ready
while (!walEntryPosition.canRead()) {
try {
@@ -101,14 +109,8 @@ public class WALEntryHandler {
Thread.currentThread().interrupt();
}
}
- // read from the wal file
- InsertNode node = null;
- try {
- node = walEntryPosition.readInsertNodeViaCache();
- } catch (Exception e) {
- throw new WALPipeException("Fail to get value because the file content
isn't correct.", e);
- }
+ final InsertNode node = isHardlink ? readFromHardlinkFile() :
readFromOriginalWALFile();
if (node == null) {
throw new WALPipeException(
String.format("Fail to get the wal value of the position %s.",
walEntryPosition));
@@ -116,6 +118,22 @@ public class WALEntryHandler {
return node;
}
+ private InsertNode readFromOriginalWALFile() throws WALPipeException {
+ try {
+ return walEntryPosition.readInsertNodeViaCache();
+ } catch (Exception e) {
+ throw new WALPipeException("Fail to get value because the file content
isn't correct.", e);
+ }
+ }
+
+ private InsertNode readFromHardlinkFile() throws WALPipeException {
+ try {
+ return walEntryPosition.readInsertNodeViaCache();
+ } catch (Exception e) {
+ throw new WALPipeException("Fail to get value because the file content
isn't correct.", e);
+ }
+ }
+
public long getMemTableId() {
return memTableId;
}
@@ -149,6 +167,11 @@ public class WALEntryHandler {
this.walEntryPosition.setSize(size);
}
+ public void hardlinkTo(File hardlinkFile) {
+ isHardlink = true;
+ this.hardlinkFile.set(hardlinkFile);
+ }
+
@Override
public String toString() {
return "WALEntryHandler{"
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
similarity index 74%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
rename to
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
index 390ba19f53f..ef9496901f3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -48,7 +48,7 @@ import java.nio.file.Files;
import static org.junit.Assert.fail;
-public class PipeFileResourceManagerTest {
+public class PipeTsFileResourceManagerTest {
private static final String ROOT_DIR = "target" + File.separator +
"PipeTsFileHolderTest";
private static final String SEQUENCE_DIR =
@@ -56,11 +56,11 @@ public class PipeFileResourceManagerTest {
private static final String TS_FILE_NAME = SEQUENCE_DIR + File.separator +
"test.tsfile";
private static final String MODS_FILE_NAME = TS_FILE_NAME + ".mods";
- private PipeFileResourceManager pipeFileResourceManager;
+ private PipeTsFileResourceManager pipeTsFileResourceManager;
@Before
public void setUp() throws Exception {
- pipeFileResourceManager = new PipeFileResourceManager();
+ pipeTsFileResourceManager = new PipeTsFileResourceManager();
createTsfile(TS_FILE_NAME);
creatModsFile(MODS_FILE_NAME);
@@ -151,27 +151,32 @@ public class PipeFileResourceManagerTest {
public void testIncreaseTsfile() throws IOException {
File originTsfile = new File(TS_FILE_NAME);
File originModFile = new File(MODS_FILE_NAME);
- Assert.assertEquals(0,
pipeFileResourceManager.getFileReferenceCount(originTsfile));
- Assert.assertEquals(0,
pipeFileResourceManager.getFileReferenceCount(originModFile));
+ Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(originTsfile));
+ Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(originModFile));
- File pipeTsfile =
pipeFileResourceManager.increaseFileReference(originTsfile, true);
- File pipeModFile =
pipeFileResourceManager.increaseFileReference(originModFile, false);
- Assert.assertEquals(1,
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(1,
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+ File pipeTsfile =
pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
+ File pipeModFile =
pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+ Assert.assertEquals(1,
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(1,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertTrue(Files.exists(originTsfile.toPath()));
Assert.assertTrue(Files.exists(originModFile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
+ pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
+ pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+ Assert.assertEquals(2,
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(2,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+
// test use hardlinkTsFile to increase reference counts
- pipeFileResourceManager.increaseFileReference(pipeTsfile, true);
- Assert.assertEquals(2,
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+ pipeTsFileResourceManager.increaseFileReference(pipeTsfile, true);
+ Assert.assertEquals(3,
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
Assert.assertTrue(Files.exists(originTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
// test use copyFile to increase reference counts
- pipeFileResourceManager.increaseFileReference(pipeModFile, false);
- Assert.assertEquals(2,
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+ pipeTsFileResourceManager.increaseFileReference(pipeModFile, false);
+ Assert.assertEquals(3,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertTrue(Files.exists(originModFile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
}
@@ -181,15 +186,15 @@ public class PipeFileResourceManagerTest {
File originFile = new File(TS_FILE_NAME);
File originModFile = new File(MODS_FILE_NAME);
- pipeFileResourceManager.decreaseFileReference(originFile);
- pipeFileResourceManager.decreaseFileReference(originModFile);
- Assert.assertEquals(0,
pipeFileResourceManager.getFileReferenceCount(originFile));
- Assert.assertEquals(0,
pipeFileResourceManager.getFileReferenceCount(originModFile));
+ pipeTsFileResourceManager.decreaseFileReference(originFile);
+ pipeTsFileResourceManager.decreaseFileReference(originModFile);
+ Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(originFile));
+ Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(originModFile));
- File pipeTsfile =
pipeFileResourceManager.increaseFileReference(originFile, true);
- File pipeModFile =
pipeFileResourceManager.increaseFileReference(originModFile, false);
- Assert.assertEquals(1,
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(1,
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+ File pipeTsfile =
pipeTsFileResourceManager.increaseFileReference(originFile, true);
+ File pipeModFile =
pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+ Assert.assertEquals(1,
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(1,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
@@ -200,17 +205,17 @@ public class PipeFileResourceManagerTest {
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
- Assert.assertEquals(1,
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(1,
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+ Assert.assertEquals(1,
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(1,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
- pipeFileResourceManager.decreaseFileReference(pipeTsfile);
- pipeFileResourceManager.decreaseFileReference(pipeModFile);
- Assert.assertEquals(0,
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(0,
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+ pipeTsFileResourceManager.decreaseFileReference(pipeTsfile);
+ pipeTsFileResourceManager.decreaseFileReference(pipeModFile);
+ Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
Assert.assertFalse(Files.exists(pipeTsfile.toPath()));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeWALHardlinkResourceManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeWALHardlinkResourceManagerTest.java
new file mode 100644
index 00000000000..4427113075e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeWALHardlinkResourceManagerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.FileUtils;
+import
org.apache.iotdb.db.pipe.resource.wal.hardlink.PipeWALHardlinkResourceManager;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeWALHardlinkResourceManagerTest {
+ private static final String ROOT_DIR = "target" + File.separator +
"PipeWALHolderTest";
+
+ private static final String WAL_DIR = ROOT_DIR + File.separator +
IoTDBConstant.WAL_FOLDER_NAME;
+
+ private static final String WAL_NAME = WAL_DIR + File.separator + "test.wal";
+
+ private PipeWALHardlinkResourceManager pipeWALHardlinkResourceManager;
+
+ @Before
+ public void setUp() throws Exception {
+ pipeWALHardlinkResourceManager = new PipeWALHardlinkResourceManager();
+
+ createWAL();
+ }
+
+ private void createWAL() {
+ File file = new File(WAL_NAME);
+ if (file.exists()) {
+ boolean ignored = file.delete();
+ }
+
+ try {
+ file.getParentFile().mkdirs();
+ file.createNewFile();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ File pipeFolder = new File(ROOT_DIR);
+ if (pipeFolder.exists()) {
+ FileUtils.deleteDirectory(pipeFolder);
+ }
+ }
+
+ @Test
+ public void testIncreaseTsfile() throws IOException {
+ File originWALFile = new File(WAL_NAME);
+ Assert.assertEquals(0,
pipeWALHardlinkResourceManager.getFileReferenceCount(originWALFile));
+
+ File pipeWALFile =
pipeWALHardlinkResourceManager.increaseFileReference(originWALFile);
+ Assert.assertEquals(1,
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+ Assert.assertTrue(Files.exists(originWALFile.toPath()));
+ Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+
+ // test use hardlinkTsFile to increase reference counts
+ pipeWALHardlinkResourceManager.increaseFileReference(pipeWALFile);
+ Assert.assertEquals(2,
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+ Assert.assertTrue(Files.exists(originWALFile.toPath()));
+ Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+ }
+
+ @Test
+ public void testDecreaseTsfile() throws IOException {
+ File originFile = new File(WAL_NAME);
+
+ pipeWALHardlinkResourceManager.decreaseFileReference(originFile);
+ Assert.assertEquals(0,
pipeWALHardlinkResourceManager.getFileReferenceCount(originFile));
+
+ File pipeWALFile =
pipeWALHardlinkResourceManager.increaseFileReference(originFile);
+ Assert.assertEquals(1,
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+ Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+ Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+
+ Assert.assertTrue(originFile.delete());
+ Assert.assertFalse(Files.exists(originFile.toPath()));
+
+ Assert.assertEquals(1,
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+ Assert.assertFalse(Files.exists(originFile.toPath()));
+ Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+
+ pipeWALHardlinkResourceManager.decreaseFileReference(pipeWALFile);
+ Assert.assertEquals(0,
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+ Assert.assertFalse(Files.exists(originFile.toPath()));
+ Assert.assertFalse(Files.exists(pipeWALFile.toPath()));
+ }
+}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index b5eedff05fb..98104de3b5b 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -952,9 +952,21 @@ cluster_name=defaultCluster
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
-# The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe module.
+# The name of the directory that stores the files temporarily hold or
generated by the pipe module.
# The directory is located in the data directory of IoTDB.
-# pipe_hardlink_tsfile_dir_name=pipe
+# pipe_hardlink_base_dir_name=pipe
+
+# The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe module.
+# The directory is located in the pipe_hardlink_base_dir_name directory of
IoTDB.
+# pipe_hardlink_tsfile_dir_name=tsfile
+
+# The name of the directory that stores the wal temporarily hold or generated
by the pipe module.
+# The directory is located in the pipe_hardlink_base_dir_name directory of
IoTDB.
+# pipe_hardlink_wal_dir_name=wal
+
+# Enable hardlink for wal files or not. If enabled, the wal files will be
hardlink-ed to
+# the pipe_hardlink_wal_dir_name directory.
+# pipe_hardlink_wal_enabled=false
# The row size of tablets created in pipe transfer.
# pipe_data_structure_tablet_row_size=65536
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3732ac188fc..eb8d5f7b2f7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -141,7 +141,13 @@ public class CommonConfig {
* The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe
* module. The directory is located in the data directory of IoTDB.
*/
- private String pipeHardlinkTsFileDirName = "pipe";
+ private String pipeHardlinkBaseDirName = "pipe";
+
+ private String pipeHardlinkTsFileDirName = "tsfile";
+
+ private String pipeHardlinkWALDirName = "wal";
+
+ private boolean pipeHardLinkWALEnabled = false;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum = 5;
@@ -458,12 +464,36 @@ public class CommonConfig {
return timestampPrecision;
}
+ public String getPipeHardlinkBaseDirName() {
+ return pipeHardlinkBaseDirName;
+ }
+
+ public void setPipeHardlinkBaseDirName(String pipeHardlinkBaseDirName) {
+ this.pipeHardlinkBaseDirName = pipeHardlinkBaseDirName;
+ }
+
public String getPipeHardlinkTsFileDirName() {
return pipeHardlinkTsFileDirName;
}
- public void setPipeHardlinkTsFileDirName(String pipeHardlinkTsFileDirName) {
- this.pipeHardlinkTsFileDirName = pipeHardlinkTsFileDirName;
+ public void setPipeHardlinkTsFileDirName(String pipeTsFileDirName) {
+ this.pipeHardlinkTsFileDirName = pipeTsFileDirName;
+ }
+
+ public String getPipeHardlinkWALDirName() {
+ return pipeHardlinkWALDirName;
+ }
+
+ public void setPipeHardlinkWALDirName(String pipeWALDirName) {
+ this.pipeHardlinkWALDirName = pipeWALDirName;
+ }
+
+ public boolean getPipeHardLinkWALEnabled() {
+ return pipeHardLinkWALEnabled;
+ }
+
+ public void setPipeHardLinkWALEnabled(boolean pipeHardLinkWALEnabled) {
+ this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled;
}
public int getPipeDataStructureTabletRowSize() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ae2ae904fe4..f45c92d479c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -241,9 +241,19 @@ public class CommonDescriptor {
}
private void loadPipeProps(Properties properties) {
+ config.setPipeHardlinkBaseDirName(
+ properties.getProperty("pipe_hardlink_base_dir_name",
config.getPipeHardlinkBaseDirName()));
config.setPipeHardlinkTsFileDirName(
properties.getProperty(
"pipe_hardlink_tsfile_dir_name",
config.getPipeHardlinkTsFileDirName()));
+ config.setPipeHardlinkWALDirName(
+ properties.getProperty("pipe_hardlink_wal_dir_name",
config.getPipeHardlinkWALDirName()));
+ config.setPipeHardLinkWALEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_hardlink_wal_enabled",
+ Boolean.toString(config.getPipeHardLinkWALEnabled()))));
+
config.setPipeDataStructureTabletRowSize(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 6bbe69079f2..9cb9f4d70c0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -31,10 +31,22 @@ public class PipeConfig {
/////////////////////////////// File ///////////////////////////////
+ public String getPipeHardlinkBaseDirName() {
+ return COMMON_CONFIG.getPipeHardlinkBaseDirName();
+ }
+
public String getPipeHardlinkTsFileDirName() {
return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
}
+ public String getPipeHardlinkWALDirName() {
+ return COMMON_CONFIG.getPipeHardlinkWALDirName();
+ }
+
+ public boolean getPipeHardLinkWALEnabled() {
+ return COMMON_CONFIG.getPipeHardLinkWALEnabled();
+ }
+
/////////////////////////////// Tablet ///////////////////////////////
public int getPipeDataStructureTabletRowSize() {
@@ -126,7 +138,12 @@ public class PipeConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
public void printAllConfigs() {
+ LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
LOGGER.info("PipeHardlinkTsFileDirName: {}",
getPipeHardlinkTsFileDirName());
+ LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
+ LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
+
+ LOGGER.info("PipeDataStructureTabletRowSize: {}",
getPipeDataStructureTabletRowSize());
LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}",
getPipeSubtaskExecutorMaxThreadNum());
LOGGER.info(
@@ -147,6 +164,7 @@ public class PipeConfig {
LOGGER.info(
"PipeExtractorPendingQueueTabletLimit: {}",
getPipeExtractorPendingQueueTabletLimit());
+ LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
LOGGER.info("PipeConnectorPendingQueueSize: {}",
getPipeConnectorPendingQueueSize());
@@ -158,6 +176,10 @@ public class PipeConfig {
LOGGER.info(
"PipeMetaSyncerInitialSyncDelayMinutes: {}",
getPipeMetaSyncerInitialSyncDelayMinutes());
LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}",
getPipeMetaSyncerSyncIntervalMinutes());
+ LOGGER.info(
+ "PipeMetaSyncerAutoRestartPipeCheckIntervalRound: {}",
+ getPipeMetaSyncerAutoRestartPipeCheckIntervalRound());
+ LOGGER.info("PipeAutoRestartEnabled: {}", getPipeAutoRestartEnabled());
}
/////////////////////////////// Singleton ///////////////////////////////