This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch hdfs_fix_0.9 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0342ff3a76338be4151760c87bc074d01e5abd7c Author: samperson1997 <[email protected]> AuthorDate: Mon May 4 22:55:47 2020 +0800 Fix merge caused errors for TsFile storage in HDFS --- .../apache/iotdb/hadoop/fileSystem/HDFSFile.java | 24 ++++++++------------- .../iotdb/db/engine/merge/task/MergeFileTask.java | 25 +++++++++++++--------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java index f82e024..53a48bb 100644 --- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java +++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java @@ -130,7 +130,7 @@ public class HDFSFile extends File { @Override public File getParentFile() { - return new HDFSFile(hdfsPath.getParent().getName()); + return new HDFSFile(hdfsPath.getParent().toUri().toString()); } @Override @@ -151,10 +151,7 @@ public class HDFSFile extends File { @Override public boolean mkdirs() { try { - if (exists()) { - return false; - } - return fs.mkdirs(hdfsPath); + return !exists() && fs.mkdirs(hdfsPath); } catch (IOException e) { logger.error("Fail to create directory {}. ", hdfsPath.toUri().toString(), e); return false; @@ -208,10 +205,7 @@ public class HDFSFile extends File { @Override public boolean equals(Object obj) { - if ((obj != null) && (obj instanceof HDFSFile)) { - return compareTo((HDFSFile) obj) == 0; - } - return false; + return obj instanceof HDFSFile && compareTo((HDFSFile) obj) == 0; } @Override @@ -235,7 +229,7 @@ public class HDFSFile extends File { public BufferedWriter getBufferedWriter(String filePath, boolean append) { try { - return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath)))); + return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath)))); } catch (IOException e) { logger.error("Failed to get buffered writer for {}. ", filePath, e); return null; @@ -290,22 +284,22 @@ public class HDFSFile extends File { } @Override - public String getParent() { - throw new UnsupportedOperationException("Unsupported operation."); + public File getAbsoluteFile() { + return new HDFSFile(getAbsolutePath()); } @Override - public boolean isAbsolute() { + public String getParent() { throw new UnsupportedOperationException("Unsupported operation."); } @Override - public File[] listFiles(FileFilter filter) { + public boolean isAbsolute() { throw new UnsupportedOperationException("Unsupported operation."); } @Override - public File getAbsoluteFile() { + public File[] listFiles(FileFilter filter) { throw new UnsupportedOperationException("Unsupported operation."); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java index 7b179fa..f33cdca 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; @@ -38,6 +37,8 @@ import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; @@ -63,6 +64,8 @@ class MergeFileTask { private MergeResource resource; private List<TsFileResource> unmergedFiles; + private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger, MergeResource resource, List<TsFileResource> unmergedSeqFiles) { this.taskName = taskName; @@ -157,10 +160,11 @@ class MergeFileTask { newFileWriter.getFile().delete(); File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile()); - FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile); - FileUtils - .moveFile(new File(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), - new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); + fsFactory.moveFile(seqFile.getFile(), nextMergeVersionFile); + fsFactory.moveFile(fsFactory + .getFile(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), + fsFactory.getFile( + nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); seqFile.setFile(nextMergeVersionFile); } finally { seqFile.getWriteQueryLock().writeLock().unlock(); @@ -226,10 +230,11 @@ class MergeFileTask { seqFile.getFile().delete(); File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile()); - FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile); - FileUtils - .moveFile(new File(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), - new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); + fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile); + fsFactory.moveFile(fsFactory + .getFile(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), + fsFactory.getFile( + nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); seqFile.setFile(nextMergeVersionFile); } finally { seqFile.getWriteQueryLock().writeLock().unlock(); @@ -240,7 +245,7 @@ class MergeFileTask { String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "") .split(IoTDBConstant.TSFILE_NAME_SEPARATOR); int mergeVersion = Integer.parseInt(splits[2]) + 1; - return new File(seqFile.getParentFile(), + return fsFactory.getFile(seqFile.getParentFile(), splits[0] + IoTDBConstant.TSFILE_NAME_SEPARATOR + splits[1] + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeVersion + TSFILE_SUFFIX); }
