This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch hdfs_fix in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 66ce019394bbfd2a086fe45c6b274d17398884f2 Author: samperson1997 <[email protected]> AuthorDate: Sat May 2 12:44:52 2020 +0800 Fix merge and flush caused errors for TsFile storage in HDFS --- .../apache/iotdb/hadoop/fileSystem/HDFSFile.java | 24 +++++++---------- .../iotdb/db/engine/merge/task/MergeFileTask.java | 31 +++++++++++++--------- .../db/engine/storagegroup/TsFileResource.java | 12 ++++----- .../tsfile/fileSystem/fsFactory/HDFSFactory.java | 1 - 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java index f82e024..ca75f81 100644 --- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java +++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java @@ -130,7 +130,7 @@ public class HDFSFile extends File { @Override public File getParentFile() { - return new HDFSFile(hdfsPath.getParent().getName()); + return new HDFSFile(hdfsPath.getParent().toUri().toString()); } @Override @@ -151,10 +151,7 @@ public class HDFSFile extends File { @Override public boolean mkdirs() { try { - if (exists()) { - return false; - } - return fs.mkdirs(hdfsPath); + return !exists() && fs.mkdirs(hdfsPath); } catch (IOException e) { logger.error("Fail to create directory {}. ", hdfsPath.toUri().toString(), e); return false; @@ -208,10 +205,7 @@ public class HDFSFile extends File { @Override public boolean equals(Object obj) { - if ((obj != null) && (obj instanceof HDFSFile)) { - return compareTo((HDFSFile) obj) == 0; - } - return false; + return (obj != null) && (obj instanceof HDFSFile) && compareTo((HDFSFile) obj) == 0; } @Override @@ -235,7 +229,7 @@ public class HDFSFile extends File { public BufferedWriter getBufferedWriter(String filePath, boolean append) { try { - return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath)))); + return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath)))); } catch (IOException e) { logger.error("Failed to get buffered writer for {}. ", filePath, e); return null; @@ -290,22 +284,22 @@ public class HDFSFile extends File { } @Override - public String getParent() { - throw new UnsupportedOperationException("Unsupported operation."); + public File getAbsoluteFile() { + return new HDFSFile(getAbsolutePath()); } @Override - public boolean isAbsolute() { + public String getParent() { throw new UnsupportedOperationException("Unsupported operation."); } @Override - public File[] listFiles(FileFilter filter) { + public boolean isAbsolute() { throw new UnsupportedOperationException("Unsupported operation."); } @Override - public File getAbsoluteFile() { + public File[] listFiles(FileFilter filter) { throw new UnsupportedOperationException("Unsupported operation."); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java index 8c1b848..b6a5778 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java @@ -23,10 +23,11 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; - -import org.apache.commons.io.FileUtils; +import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.cache.ChunkMetadataCache; import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; @@ -37,6 +38,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; @@ -61,6 +64,8 @@ class MergeFileTask { private MergeResource resource; private List<TsFileResource> unmergedFiles; + private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger, MergeResource resource, List<TsFileResource> unmergedSeqFiles) { this.taskName = taskName; @@ -165,10 +170,11 @@ class MergeFileTask { newFileWriter.getFile().delete(); File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile()); - FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile); - FileUtils - .moveFile(new File(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), - new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); + fsFactory.moveFile(seqFile.getFile(), nextMergeVersionFile); + fsFactory.moveFile( + fsFactory.getFile(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), + fsFactory + .getFile(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); seqFile.setFile(nextMergeVersionFile); } catch (Exception e) { RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter( @@ -262,10 +268,11 @@ class MergeFileTask { seqFile.getFile().delete(); File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile()); - FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile); - FileUtils - .moveFile(new File(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), - new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); + fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile); + fsFactory.moveFile( + fsFactory.getFile(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), + fsFactory + .getFile(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); seqFile.setFile(nextMergeVersionFile); } catch (Exception e) { logger.error(e.getMessage(), e); @@ -278,7 +285,7 @@ class MergeFileTask { String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "") .split(IoTDBConstant.TSFILE_NAME_SEPARATOR); int mergeVersion = Integer.parseInt(splits[2]) + 1; - return new File(seqFile.getParentFile(), + return fsFactory.getFile(seqFile.getParentFile(), splits[0] + IoTDBConstant.TSFILE_NAME_SEPARATOR + splits[1] + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeVersion + TSFILE_SUFFIX); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 43eac5f..b354b0b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -362,9 +362,9 @@ public class TsFileResource { } void moveTo(File targetDir) throws IOException { - FileUtils.moveFile(file, new File(targetDir, file.getName())); - FileUtils.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX), - new File(targetDir, file.getName() + RESOURCE_SUFFIX)); + fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName())); + fsFactory.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX), + fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX)); fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete(); } @@ -442,7 +442,7 @@ public class TsFileResource { */ void setCloseFlag() { try { - new File(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile(); + fsFactory.getFile(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile(); } catch (IOException e) { logger.error("Cannot create close flag for {}", file, e); } @@ -452,11 +452,11 @@ public class TsFileResource { * clean the close flag (if existed) when the file is successfully closed. */ public void cleanCloseFlag() { - new File(file.getAbsoluteFile() + CLOSING_SUFFIX).delete(); + fsFactory.getFile(file.getAbsoluteFile() + CLOSING_SUFFIX).delete(); } public boolean isCloseFlagSet() { - return new File(file.getAbsoluteFile() + CLOSING_SUFFIX).exists(); + return fsFactory.getFile(file.getAbsoluteFile() + CLOSING_SUFFIX).exists(); } public Set<Long> getHistoricalVersions() { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java index 0e80f88..2ee89d9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java @@ -28,7 +28,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URI; - import org.slf4j.Logger; import org.slf4j.LoggerFactory;
