This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.9
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/rel/0.9 by this push:
new 279b41c Fix merge caused errors for TsFile storage in HDFS (#1153)
279b41c is described below
commit 279b41c2c623e7961886a14d77673d3985c26185
Author: Zesong Sun <[email protected]>
AuthorDate: Wed May 6 10:18:55 2020 +0800
Fix merge caused errors for TsFile storage in HDFS (#1153)
---
.../apache/iotdb/hadoop/fileSystem/HDFSFile.java | 24 ++++++++-------------
.../iotdb/db/engine/merge/task/MergeFileTask.java | 25 +++++++++++++---------
2 files changed, 24 insertions(+), 25 deletions(-)
diff --git
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index f82e024..53a48bb 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -130,7 +130,7 @@ public class HDFSFile extends File {
@Override
public File getParentFile() {
- return new HDFSFile(hdfsPath.getParent().getName());
+ return new HDFSFile(hdfsPath.getParent().toUri().toString());
}
@Override
@@ -151,10 +151,7 @@ public class HDFSFile extends File {
@Override
public boolean mkdirs() {
try {
- if (exists()) {
- return false;
- }
- return fs.mkdirs(hdfsPath);
+ return !exists() && fs.mkdirs(hdfsPath);
} catch (IOException e) {
logger.error("Fail to create directory {}. ",
hdfsPath.toUri().toString(), e);
return false;
@@ -208,10 +205,7 @@ public class HDFSFile extends File {
@Override
public boolean equals(Object obj) {
- if ((obj != null) && (obj instanceof HDFSFile)) {
- return compareTo((HDFSFile) obj) == 0;
- }
- return false;
+ return obj instanceof HDFSFile && compareTo((HDFSFile) obj) == 0;
}
@Override
@@ -235,7 +229,7 @@ public class HDFSFile extends File {
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
try {
- return new BufferedWriter(new OutputStreamWriter(fs.create(new
Path(filePath))));
+ return new BufferedWriter(new OutputStreamWriter(fs.create(new
Path(filePath))));
} catch (IOException e) {
logger.error("Failed to get buffered writer for {}. ", filePath, e);
return null;
@@ -290,22 +284,22 @@ public class HDFSFile extends File {
}
@Override
- public String getParent() {
- throw new UnsupportedOperationException("Unsupported operation.");
+ public File getAbsoluteFile() {
+ return new HDFSFile(getAbsolutePath());
}
@Override
- public boolean isAbsolute() {
+ public String getParent() {
throw new UnsupportedOperationException("Unsupported operation.");
}
@Override
- public File[] listFiles(FileFilter filter) {
+ public boolean isAbsolute() {
throw new UnsupportedOperationException("Unsupported operation.");
}
@Override
- public File getAbsoluteFile() {
+ public File[] listFiles(FileFilter filter) {
throw new UnsupportedOperationException("Unsupported operation.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 7b179fa..f33cdca 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
@@ -38,6 +37,8 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -63,6 +64,8 @@ class MergeFileTask {
private MergeResource resource;
private List<TsFileResource> unmergedFiles;
+ private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
this.taskName = taskName;
@@ -157,10 +160,11 @@ class MergeFileTask {
newFileWriter.getFile().delete();
File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
- FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
- FileUtils
- .moveFile(new File(seqFile.getFile().getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX),
- new File(nextMergeVersionFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX));
+ fsFactory.moveFile(seqFile.getFile(), nextMergeVersionFile);
+ fsFactory.moveFile(fsFactory
+ .getFile(seqFile.getFile().getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX),
+ fsFactory.getFile(
+ nextMergeVersionFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
@@ -226,10 +230,11 @@ class MergeFileTask {
seqFile.getFile().delete();
File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
- FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
- FileUtils
- .moveFile(new File(seqFile.getFile().getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX),
- new File(nextMergeVersionFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX));
+ fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
+ fsFactory.moveFile(fsFactory
+ .getFile(seqFile.getFile().getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX),
+ fsFactory.getFile(
+ nextMergeVersionFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
@@ -240,7 +245,7 @@ class MergeFileTask {
String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "")
.split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
int mergeVersion = Integer.parseInt(splits[2]) + 1;
- return new File(seqFile.getParentFile(),
+ return fsFactory.getFile(seqFile.getParentFile(),
splits[0] + IoTDBConstant.TSFILE_NAME_SEPARATOR + splits[1]
+ IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeVersion +
TSFILE_SUFFIX);
}