This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3276737  Fix concurrent queries cause BufferUnderflowException when 
storage in HDFS (#1151)
3276737 is described below

commit 32767378b19e06b68cd1c66644b1baa86b659318
Author: Zesong Sun <[email protected]>
AuthorDate: Sat May 9 16:30:24 2020 +0800

    Fix concurrent queries cause BufferUnderflowException when storage in HDFS 
(#1151)
---
 .../apache/iotdb/hadoop/fileSystem/HDFSFile.java   |  2 +-
 .../apache/iotdb/hadoop/fileSystem/HDFSInput.java  | 45 +++++++++-------------
 .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 15 ++------
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 25 +++---------
 4 files changed, 29 insertions(+), 58 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java 
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index 53a48bb..68a0d50 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -141,7 +141,7 @@ public class HDFSFile extends File {
   @Override
   public boolean delete() {
     try {
-      return fs.delete(hdfsPath, true);
+      return !fs.exists(hdfsPath) || fs.delete(hdfsPath, true);
     } catch (IOException e) {
       logger.error("Fail to delete file {}. ", hdfsPath.toUri().toString(), e);
       return false;
diff --git 
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java 
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
index f3d6274..0b7da82 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
@@ -25,7 +25,6 @@ import java.nio.channels.FileChannel;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
@@ -33,33 +32,26 @@ import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 public class HDFSInput implements TsFileInput {
 
   private FSDataInputStream fsDataInputStream;
-  private FileStatus fileStatus;
-  private boolean byteBufferReadable;
+  private FileSystem fs;
+  private Path path;
 
   public HDFSInput(String filePath) throws IOException {
-
-    this(filePath, new Configuration());
+    this(new Path(filePath), new Configuration());
   }
 
   public HDFSInput(String filePath, Configuration configuration) throws 
IOException {
-
     this(new Path(filePath), configuration);
   }
 
   public HDFSInput(Path path, Configuration configuration) throws IOException {
-    FileSystem fs = path.getFileSystem(configuration);
-    if (fs instanceof ChecksumFileSystem) {
-      byteBufferReadable = false;
-    } else {
-      byteBufferReadable = true;
-    }
-    fsDataInputStream = fs.open(path);
-    fileStatus = fs.getFileStatus(path);
+    this.fs = path.getFileSystem(configuration);
+    this.path = path;
+    this.fsDataInputStream = fs.open(path);
   }
 
   @Override
-  public long size() {
-    return fileStatus.getLen();
+  public long size() throws IOException {
+    return fs.getFileStatus(path).getLen();
   }
 
   @Override
@@ -68,26 +60,26 @@ public class HDFSInput implements TsFileInput {
   }
 
   @Override
-  public TsFileInput position(long newPosition) throws IOException {
+  public synchronized TsFileInput position(long newPosition) throws 
IOException {
     fsDataInputStream.seek(newPosition);
     return this;
   }
 
   @Override
-  public int read(ByteBuffer dst) throws IOException {
+  public synchronized int read(ByteBuffer dst) throws IOException {
     int res;
-    if (byteBufferReadable) {
-      res = fsDataInputStream.read(dst);
-    } else {
+    if (fs instanceof ChecksumFileSystem) {
       byte[] bytes = new byte[dst.remaining()];
       res = fsDataInputStream.read(bytes);
       dst.put(bytes);
+    } else {
+      res = fsDataInputStream.read(dst);
     }
     return res;
   }
 
   @Override
-  public int read(ByteBuffer dst, long position) throws IOException {
+  public synchronized int read(ByteBuffer dst, long position) throws 
IOException {
     if (position < 0) {
       throw new IllegalArgumentException("position must be non-negative");
     }
@@ -107,17 +99,17 @@ public class HDFSInput implements TsFileInput {
 
   @Override
   public int read() throws IOException {
-    throw new IOException("Not support");
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public int read(byte[] b, int off, int len) throws IOException {
-    throw new IOException("Not support");
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public FileChannel wrapAsFileChannel() throws IOException {
-    throw new IOException("Not support");
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -132,7 +124,6 @@ public class HDFSInput implements TsFileInput {
 
   @Override
   public int readInt() throws IOException {
-    throw new IOException("Not support");
+    throw new UnsupportedOperationException();
   }
-
 }
diff --git 
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java 
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index f38e398..2e86fda 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -18,17 +18,14 @@
  */
 package org.apache.iotdb.hadoop.fileSystem;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 
 /**
@@ -40,14 +37,12 @@ public class HDFSOutput implements TsFileOutput {
   private FSDataOutputStream fsDataOutputStream;
   private FileSystem fs;
   private Path path;
-  private static final Logger logger = 
LoggerFactory.getLogger(HDFSOutput.class);
 
   public HDFSOutput(String filePath, boolean overwrite) throws IOException {
     this(filePath, new Configuration(), overwrite);
     path = new Path(filePath);
   }
 
-
   public HDFSOutput(String filePath, Configuration configuration, boolean 
overwrite)
       throws IOException {
     this(new Path(filePath), configuration, overwrite);
@@ -96,9 +91,7 @@ public class HDFSOutput implements TsFileOutput {
     if (fs.exists(path)) {
       fsDataOutputStream.close();
     }
-    if (!fs.truncate(path, position)) {
-      logger.error("Failed to truncate file {}. ", path.toUri().toString());
-    }
+    fs.truncate(path, position);
     if (fs.exists(path)) {
       fsDataOutputStream = fs.append(path);
     }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 00553b9..0e23dc2 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -195,7 +195,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    */
   public String readTailMagic() throws IOException {
     long totalSize = tsFileInput.size();
-    ByteBuffer magicStringBytes = 
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
+    ByteBuffer magicStringBytes = ByteBuffer
+        .allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
     tsFileInput.read(magicStringBytes, totalSize - 
TSFileConfig.MAGIC_STRING.getBytes().length);
     magicStringBytes.flip();
     return new String(magicStringBytes.array());
@@ -215,23 +216,9 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * this function does not modify the position of the file reader.
    */
   public String readHeadMagic() throws IOException {
-    return readHeadMagic(false);
-  }
-
-  /**
-   * this function does not modify the position of the file reader.
-   *
-   * @param movePosition whether move the position of the file reader after 
reading the magic header
-   * to the end of the magic head string.
-   */
-  public String readHeadMagic(boolean movePosition) throws IOException {
-    ByteBuffer magicStringBytes = 
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
-    if (movePosition) {
-      tsFileInput.position(0);
-      tsFileInput.read(magicStringBytes);
-    } else {
-      tsFileInput.read(magicStringBytes, 0);
-    }
+    ByteBuffer magicStringBytes = ByteBuffer
+        .allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(magicStringBytes, 0);
     magicStringBytes.flip();
     return new String(magicStringBytes.array());
   }
@@ -849,7 +836,7 @@ public class TsFileSequenceReader implements AutoCloseable {
     if (fileSize < headerLength) {
       return TsFileCheckStatus.INCOMPATIBLE_FILE;
     }
-    String magic = readHeadMagic(true);
+    String magic = readHeadMagic();
     tsFileInput.position(headerLength);
     if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
       return TsFileCheckStatus.INCOMPATIBLE_FILE;

Reply via email to