This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3276737 Fix concurrent queries cause BufferUnderflowException when
storage in HDFS (#1151)
3276737 is described below
commit 32767378b19e06b68cd1c66644b1baa86b659318
Author: Zesong Sun <[email protected]>
AuthorDate: Sat May 9 16:30:24 2020 +0800
Fix concurrent queries cause BufferUnderflowException when storage in HDFS
(#1151)
---
.../apache/iotdb/hadoop/fileSystem/HDFSFile.java | 2 +-
.../apache/iotdb/hadoop/fileSystem/HDFSInput.java | 45 +++++++++-------------
.../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 15 ++------
.../iotdb/tsfile/read/TsFileSequenceReader.java | 25 +++---------
4 files changed, 29 insertions(+), 58 deletions(-)
diff --git
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index 53a48bb..68a0d50 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -141,7 +141,7 @@ public class HDFSFile extends File {
@Override
public boolean delete() {
try {
- return fs.delete(hdfsPath, true);
+ return !fs.exists(hdfsPath) || fs.delete(hdfsPath, true);
} catch (IOException e) {
logger.error("Fail to delete file {}. ", hdfsPath.toUri().toString(), e);
return false;
diff --git
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
index f3d6274..0b7da82 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
@@ -25,7 +25,6 @@ import java.nio.channels.FileChannel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
@@ -33,33 +32,26 @@ import org.apache.iotdb.tsfile.read.reader.TsFileInput;
public class HDFSInput implements TsFileInput {
private FSDataInputStream fsDataInputStream;
- private FileStatus fileStatus;
- private boolean byteBufferReadable;
+ private FileSystem fs;
+ private Path path;
public HDFSInput(String filePath) throws IOException {
-
- this(filePath, new Configuration());
+ this(new Path(filePath), new Configuration());
}
public HDFSInput(String filePath, Configuration configuration) throws
IOException {
-
this(new Path(filePath), configuration);
}
public HDFSInput(Path path, Configuration configuration) throws IOException {
- FileSystem fs = path.getFileSystem(configuration);
- if (fs instanceof ChecksumFileSystem) {
- byteBufferReadable = false;
- } else {
- byteBufferReadable = true;
- }
- fsDataInputStream = fs.open(path);
- fileStatus = fs.getFileStatus(path);
+ this.fs = path.getFileSystem(configuration);
+ this.path = path;
+ this.fsDataInputStream = fs.open(path);
}
@Override
- public long size() {
- return fileStatus.getLen();
+ public long size() throws IOException {
+ return fs.getFileStatus(path).getLen();
}
@Override
@@ -68,26 +60,26 @@ public class HDFSInput implements TsFileInput {
}
@Override
- public TsFileInput position(long newPosition) throws IOException {
+ public synchronized TsFileInput position(long newPosition) throws
IOException {
fsDataInputStream.seek(newPosition);
return this;
}
@Override
- public int read(ByteBuffer dst) throws IOException {
+ public synchronized int read(ByteBuffer dst) throws IOException {
int res;
- if (byteBufferReadable) {
- res = fsDataInputStream.read(dst);
- } else {
+ if (fs instanceof ChecksumFileSystem) {
byte[] bytes = new byte[dst.remaining()];
res = fsDataInputStream.read(bytes);
dst.put(bytes);
+ } else {
+ res = fsDataInputStream.read(dst);
}
return res;
}
@Override
- public int read(ByteBuffer dst, long position) throws IOException {
+ public synchronized int read(ByteBuffer dst, long position) throws
IOException {
if (position < 0) {
throw new IllegalArgumentException("position must be non-negative");
}
@@ -107,17 +99,17 @@ public class HDFSInput implements TsFileInput {
@Override
public int read() throws IOException {
- throw new IOException("Not support");
+ throw new UnsupportedOperationException();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
- throw new IOException("Not support");
+ throw new UnsupportedOperationException();
}
@Override
public FileChannel wrapAsFileChannel() throws IOException {
- throw new IOException("Not support");
+ throw new UnsupportedOperationException();
}
@Override
@@ -132,7 +124,6 @@ public class HDFSInput implements TsFileInput {
@Override
public int readInt() throws IOException {
- throw new IOException("Not support");
+ throw new UnsupportedOperationException();
}
-
}
diff --git
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index f38e398..2e86fda 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -18,17 +18,14 @@
*/
package org.apache.iotdb.hadoop.fileSystem;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
/**
@@ -40,14 +37,12 @@ public class HDFSOutput implements TsFileOutput {
private FSDataOutputStream fsDataOutputStream;
private FileSystem fs;
private Path path;
- private static final Logger logger =
LoggerFactory.getLogger(HDFSOutput.class);
public HDFSOutput(String filePath, boolean overwrite) throws IOException {
this(filePath, new Configuration(), overwrite);
path = new Path(filePath);
}
-
public HDFSOutput(String filePath, Configuration configuration, boolean
overwrite)
throws IOException {
this(new Path(filePath), configuration, overwrite);
@@ -96,9 +91,7 @@ public class HDFSOutput implements TsFileOutput {
if (fs.exists(path)) {
fsDataOutputStream.close();
}
- if (!fs.truncate(path, position)) {
- logger.error("Failed to truncate file {}. ", path.toUri().toString());
- }
+ fs.truncate(path, position);
if (fs.exists(path)) {
fsDataOutputStream = fs.append(path);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 00553b9..0e23dc2 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -195,7 +195,8 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public String readTailMagic() throws IOException {
long totalSize = tsFileInput.size();
- ByteBuffer magicStringBytes =
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
+ ByteBuffer magicStringBytes = ByteBuffer
+ .allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
tsFileInput.read(magicStringBytes, totalSize -
TSFileConfig.MAGIC_STRING.getBytes().length);
magicStringBytes.flip();
return new String(magicStringBytes.array());
@@ -215,23 +216,9 @@ public class TsFileSequenceReader implements AutoCloseable
{
* this function does not modify the position of the file reader.
*/
public String readHeadMagic() throws IOException {
- return readHeadMagic(false);
- }
-
- /**
- * this function does not modify the position of the file reader.
- *
- * @param movePosition whether move the position of the file reader after
reading the magic header
- * to the end of the magic head string.
- */
- public String readHeadMagic(boolean movePosition) throws IOException {
- ByteBuffer magicStringBytes =
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
- if (movePosition) {
- tsFileInput.position(0);
- tsFileInput.read(magicStringBytes);
- } else {
- tsFileInput.read(magicStringBytes, 0);
- }
+ ByteBuffer magicStringBytes = ByteBuffer
+ .allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
+ tsFileInput.read(magicStringBytes, 0);
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
@@ -849,7 +836,7 @@ public class TsFileSequenceReader implements AutoCloseable {
if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
- String magic = readHeadMagic(true);
+ String magic = readHeadMagic();
tsFileInput.position(headerLength);
if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;