This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch hdfs_fix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 66ce019394bbfd2a086fe45c6b274d17398884f2
Author: samperson1997 <[email protected]>
AuthorDate: Sat May 2 12:44:52 2020 +0800

    Fix merge and flush caused errors for TsFile storage in HDFS
---
 .../apache/iotdb/hadoop/fileSystem/HDFSFile.java   | 24 +++++++----------
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 31 +++++++++++++---------
 .../db/engine/storagegroup/TsFileResource.java     | 12 ++++-----
 .../tsfile/fileSystem/fsFactory/HDFSFactory.java   |  1 -
 4 files changed, 34 insertions(+), 34 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java 
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index f82e024..ca75f81 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -130,7 +130,7 @@ public class HDFSFile extends File {
 
   @Override
   public File getParentFile() {
-    return new HDFSFile(hdfsPath.getParent().getName());
+    return new HDFSFile(hdfsPath.getParent().toUri().toString());
   }
 
   @Override
@@ -151,10 +151,7 @@ public class HDFSFile extends File {
   @Override
   public boolean mkdirs() {
     try {
-      if (exists()) {
-        return false;
-      }
-      return fs.mkdirs(hdfsPath);
+      return !exists() && fs.mkdirs(hdfsPath);
     } catch (IOException e) {
       logger.error("Fail to create directory {}. ", 
hdfsPath.toUri().toString(), e);
       return false;
@@ -208,10 +205,7 @@ public class HDFSFile extends File {
 
   @Override
   public boolean equals(Object obj) {
-    if ((obj != null) && (obj instanceof HDFSFile)) {
-      return compareTo((HDFSFile) obj) == 0;
-    }
-    return false;
+    return (obj != null) && (obj instanceof HDFSFile) && compareTo((HDFSFile) 
obj) == 0;
   }
 
   @Override
@@ -235,7 +229,7 @@ public class HDFSFile extends File {
 
   public BufferedWriter getBufferedWriter(String filePath, boolean append) {
     try {
-        return new BufferedWriter(new OutputStreamWriter(fs.create(new 
Path(filePath))));
+      return new BufferedWriter(new OutputStreamWriter(fs.create(new 
Path(filePath))));
     } catch (IOException e) {
       logger.error("Failed to get buffered writer for {}. ", filePath, e);
       return null;
@@ -290,22 +284,22 @@ public class HDFSFile extends File {
   }
 
   @Override
-  public String getParent() {
-    throw new UnsupportedOperationException("Unsupported operation.");
+  public File getAbsoluteFile() {
+    return new HDFSFile(getAbsolutePath());
   }
 
   @Override
-  public boolean isAbsolute() {
+  public String getParent() {
     throw new UnsupportedOperationException("Unsupported operation.");
   }
 
   @Override
-  public File[] listFiles(FileFilter filter) {
+  public boolean isAbsolute() {
     throw new UnsupportedOperationException("Unsupported operation.");
   }
 
   @Override
-  public File getAbsoluteFile() {
+  public File[] listFiles(FileFilter filter) {
     throw new UnsupportedOperationException("Unsupported operation.");
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 8c1b848..b6a5778 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -23,10 +23,11 @@ import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-
-import org.apache.commons.io.FileUtils;
+import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
@@ -37,6 +38,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -61,6 +64,8 @@ class MergeFileTask {
   private MergeResource resource;
   private List<TsFileResource> unmergedFiles;
 
+  private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
   MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
       MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
     this.taskName = taskName;
@@ -165,10 +170,11 @@ class MergeFileTask {
       newFileWriter.getFile().delete();
 
       File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
-      FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
-      FileUtils
-          .moveFile(new File(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
-              new File(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
+      fsFactory.moveFile(seqFile.getFile(), nextMergeVersionFile);
+      fsFactory.moveFile(
+          fsFactory.getFile(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
+          fsFactory
+              .getFile(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
       seqFile.setFile(nextMergeVersionFile);
     } catch (Exception e) {
       RestorableTsFileIOWriter oldFileRecoverWriter = new 
RestorableTsFileIOWriter(
@@ -262,10 +268,11 @@ class MergeFileTask {
       seqFile.getFile().delete();
 
       File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
-      FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
-      FileUtils
-          .moveFile(new File(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
-              new File(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
+      fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
+      fsFactory.moveFile(
+          fsFactory.getFile(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
+          fsFactory
+              .getFile(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
       seqFile.setFile(nextMergeVersionFile);
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
@@ -278,7 +285,7 @@ class MergeFileTask {
     String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "")
         .split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
     int mergeVersion = Integer.parseInt(splits[2]) + 1;
-    return new File(seqFile.getParentFile(),
+    return fsFactory.getFile(seqFile.getParentFile(),
         splits[0] + IoTDBConstant.TSFILE_NAME_SEPARATOR + splits[1]
             + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeVersion + 
TSFILE_SUFFIX);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 43eac5f..b354b0b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -362,9 +362,9 @@ public class TsFileResource {
   }
 
   void moveTo(File targetDir) throws IOException {
-    FileUtils.moveFile(file, new File(targetDir, file.getName()));
-    FileUtils.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
-        new File(targetDir, file.getName() + RESOURCE_SUFFIX));
+    fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName()));
+    fsFactory.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
+        fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX));
     fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
   }
 
@@ -442,7 +442,7 @@ public class TsFileResource {
    */
   void setCloseFlag() {
     try {
-      new File(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile();
+      fsFactory.getFile(file.getAbsoluteFile() + 
CLOSING_SUFFIX).createNewFile();
     } catch (IOException e) {
       logger.error("Cannot create close flag for {}", file, e);
     }
@@ -452,11 +452,11 @@ public class TsFileResource {
    * clean the close flag (if existed) when the file is successfully closed.
    */
   public void cleanCloseFlag() {
-    new File(file.getAbsoluteFile() + CLOSING_SUFFIX).delete();
+    fsFactory.getFile(file.getAbsoluteFile() + CLOSING_SUFFIX).delete();
   }
 
   public boolean isCloseFlagSet() {
-    return new File(file.getAbsoluteFile() + CLOSING_SUFFIX).exists();
+    return fsFactory.getFile(file.getAbsoluteFile() + CLOSING_SUFFIX).exists();
   }
 
   public Set<Long> getHistoricalVersions() {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index 0e80f88..2ee89d9 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -28,7 +28,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Reply via email to