This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.9
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/rel/0.9 by this push:
     new 279b41c  Fix merge caused errors for TsFile storage in HDFS (#1153)
279b41c is described below

commit 279b41c2c623e7961886a14d77673d3985c26185
Author: Zesong Sun <[email protected]>
AuthorDate: Wed May 6 10:18:55 2020 +0800

    Fix merge caused errors for TsFile storage in HDFS (#1153)
---
 .../apache/iotdb/hadoop/fileSystem/HDFSFile.java   | 24 ++++++++-------------
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 25 +++++++++++++---------
 2 files changed, 24 insertions(+), 25 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java 
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index f82e024..53a48bb 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -130,7 +130,7 @@ public class HDFSFile extends File {
 
   @Override
   public File getParentFile() {
-    return new HDFSFile(hdfsPath.getParent().getName());
+    return new HDFSFile(hdfsPath.getParent().toUri().toString());
   }
 
   @Override
@@ -151,10 +151,7 @@ public class HDFSFile extends File {
   @Override
   public boolean mkdirs() {
     try {
-      if (exists()) {
-        return false;
-      }
-      return fs.mkdirs(hdfsPath);
+      return !exists() && fs.mkdirs(hdfsPath);
     } catch (IOException e) {
       logger.error("Fail to create directory {}. ", 
hdfsPath.toUri().toString(), e);
       return false;
@@ -208,10 +205,7 @@ public class HDFSFile extends File {
 
   @Override
   public boolean equals(Object obj) {
-    if ((obj != null) && (obj instanceof HDFSFile)) {
-      return compareTo((HDFSFile) obj) == 0;
-    }
-    return false;
+    return obj instanceof HDFSFile && compareTo((HDFSFile) obj) == 0;
   }
 
   @Override
@@ -235,7 +229,7 @@ public class HDFSFile extends File {
 
   public BufferedWriter getBufferedWriter(String filePath, boolean append) {
     try {
-        return new BufferedWriter(new OutputStreamWriter(fs.create(new 
Path(filePath))));
+      return new BufferedWriter(new OutputStreamWriter(fs.create(new 
Path(filePath))));
     } catch (IOException e) {
       logger.error("Failed to get buffered writer for {}. ", filePath, e);
       return null;
@@ -290,22 +284,22 @@ public class HDFSFile extends File {
   }
 
   @Override
-  public String getParent() {
-    throw new UnsupportedOperationException("Unsupported operation.");
+  public File getAbsoluteFile() {
+    return new HDFSFile(getAbsolutePath());
   }
 
   @Override
-  public boolean isAbsolute() {
+  public String getParent() {
     throw new UnsupportedOperationException("Unsupported operation.");
   }
 
   @Override
-  public File[] listFiles(FileFilter filter) {
+  public boolean isAbsolute() {
     throw new UnsupportedOperationException("Unsupported operation.");
   }
 
   @Override
-  public File getAbsoluteFile() {
+  public File[] listFiles(FileFilter filter) {
     throw new UnsupportedOperationException("Unsupported operation.");
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 7b179fa..f33cdca 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
 import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
@@ -38,6 +37,8 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -63,6 +64,8 @@ class MergeFileTask {
   private MergeResource resource;
   private List<TsFileResource> unmergedFiles;
 
+  private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
   MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
       MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
     this.taskName = taskName;
@@ -157,10 +160,11 @@ class MergeFileTask {
       newFileWriter.getFile().delete();
 
       File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
-      FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
-      FileUtils
-          .moveFile(new File(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
-              new File(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
+      fsFactory.moveFile(seqFile.getFile(), nextMergeVersionFile);
+      fsFactory.moveFile(fsFactory
+                  .getFile(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
+              fsFactory.getFile(
+                  nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
       seqFile.setFile(nextMergeVersionFile);
     } finally {
       seqFile.getWriteQueryLock().writeLock().unlock();
@@ -226,10 +230,11 @@ class MergeFileTask {
       seqFile.getFile().delete();
 
       File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
-      FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
-      FileUtils
-          .moveFile(new File(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
-              new File(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
+      fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
+      fsFactory.moveFile(fsFactory
+                  .getFile(seqFile.getFile().getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX),
+              fsFactory.getFile(
+                  nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
       seqFile.setFile(nextMergeVersionFile);
     } finally {
       seqFile.getWriteQueryLock().writeLock().unlock();
@@ -240,7 +245,7 @@ class MergeFileTask {
     String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "")
         .split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
     int mergeVersion = Integer.parseInt(splits[2]) + 1;
-    return new File(seqFile.getParentFile(),
+    return fsFactory.getFile(seqFile.getParentFile(),
         splits[0] + IoTDBConstant.TSFILE_NAME_SEPARATOR + splits[1]
             + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeVersion + 
TSFILE_SUFFIX);
   }

Reply via email to