Repository: trafodion Updated Branches: refs/heads/master f493a6573 -> 0ebd76e19
[TRAFODION-3126] Refactored HDFS client implementation should also support Alluxio file system Alluxio doesn't support direct ByteBuffer access. Circumvented this problem by using non-direct ByteBuffer to read hdfs files when it belongs to Alluxio file system. No need to change the default setting of USE_LIBHDFS for Alluxio to work. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/2a6cfd1a Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/2a6cfd1a Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/2a6cfd1a Branch: refs/heads/master Commit: 2a6cfd1a24d05e1e243919897a11572e68c14d59 Parents: 82bfb1a Author: selvaganesang <[email protected]> Authored: Fri Jun 29 21:26:49 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Fri Jun 29 21:26:49 2018 +0000 ---------------------------------------------------------------------- core/sql/pom.xml | 6 +++ core/sql/pom.xml.apache | 6 +++ core/sql/pom.xml.hdp | 6 +++ .../main/java/org/trafodion/sql/HDFSClient.java | 47 ++++++++++++++++---- 4 files changed, 56 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/pom.xml ---------------------------------------------------------------------- diff --git a/core/sql/pom.xml b/core/sql/pom.xml index cd025a1..7b143d9 100644 --- a/core/sql/pom.xml +++ b/core/sql/pom.xml @@ -84,6 +84,12 @@ <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> + <dependency> + <groupId>org.alluxio</groupId> + <artifactId>alluxio-core-client-runtime</artifactId> + <scope>compile</scope> + <version>1.7.1</version> + </dependency> </dependencies> <groupId>org.trafodion.sql</groupId> http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/pom.xml.apache ---------------------------------------------------------------------- diff --git a/core/sql/pom.xml.apache b/core/sql/pom.xml.apache index 938bbbb..a8afd33 100644 --- a/core/sql/pom.xml.apache +++ b/core/sql/pom.xml.apache @@ -119,6 +119,12 @@ <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> + <dependency> + <groupId>org.alluxio</groupId> + <artifactId>alluxio-core-client-runtime</artifactId> + <scope>compile</scope> + <version>1.7.1</version> + </dependency> </dependencies> <groupId>org.trafodion.sql</groupId> http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/pom.xml.hdp ---------------------------------------------------------------------- diff --git a/core/sql/pom.xml.hdp b/core/sql/pom.xml.hdp index 5b1216f..52bac2e 100644 --- a/core/sql/pom.xml.hdp +++ b/core/sql/pom.xml.hdp @@ -99,6 +99,12 @@ <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> + <dependency> + <groupId>org.alluxio</groupId> + <artifactId>alluxio-core-client-runtime</artifactId> + <scope>compile</scope> + <version>1.7.1</version> + </dependency> </dependencies> <groupId>org.trafodion.sql</groupId> http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 95316d5..d4a697f 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -87,6 +87,7 @@ public class HDFSClient private static ExecutorService executorService_ = null; private static FileSystem defaultFs_ = null; private static CompressionCodecFactory codecFactory_ = null; + private static boolean alluxioNotInstalled_ = false; private FileSystem fs_ = null; private int bufNo_; private int rangeNo_; @@ -95,6 +96,7 @@ public class HDFSClient private OutputStream outStream_; private String filename_; private ByteBuffer buf_; + private ByteBuffer savedBuf_; private byte[] bufArray_; private int bufLen_; private int bufOffset_ = 0; @@ -126,6 +128,16 @@ public class HDFSClient catch (IOException ioe) { throw new RuntimeException("Exception in HDFSClient static block", ioe); } + try { + boolean alluxioFs = defaultFs_ instanceof alluxio.hadoop.FileSystem; + } + catch (Throwable rte) + { + // Ignore the exception. It is not needed for alluxio to be installed + // for the methods of this class to work if + // alluxio filesystem is NOT required + alluxioNotInstalled_ = true; + } codecFactory_ = new CompressionCodecFactory(config_); System.loadLibrary("executor"); } @@ -142,21 +154,32 @@ public class HDFSClient HDFSRead() { } - + public Object call() throws IOException { int bytesRead; int totalBytesRead = 0; if (compressed_) { bufArray_ = new byte[ioByteArraySizeInKB_ * 1024]; - } else - if (! buf_.hasArray()) { - try { - fsdis_.seek(pos_); - } catch (EOFException e) { - isEOF_ = 1; - return new Integer(totalBytesRead); - } + } + else { + // alluxio doesn't support direct ByteBuffer reads + // Hence, create a non-direct ByteBuffer, read into + // byteArray backing up this ByteBuffer and + // then copy the data read to direct ByteBuffer for the + // native layer to process the data + if ((! alluxioNotInstalled_) && fs_ instanceof alluxio.hadoop.FileSystem) { + savedBuf_ = buf_; + buf_ = ByteBuffer.allocate(savedBuf_.capacity()); + } + if (! buf_.hasArray()) { + try { + fsdis_.seek(pos_); + } catch (EOFException e) { + isEOF_ = 1; + return new Integer(totalBytesRead); + } + } } do { @@ -181,6 +204,12 @@ public class HDFSClient pos_ += bytesRead; lenRemain_ -= bytesRead; } while (lenRemain_ > 0); + if ((! alluxioNotInstalled_) && fs_ instanceof alluxio.hadoop.FileSystem) { + if (totalBytesRead > 0) { + byte[] temp = buf_.array(); + savedBuf_.put(temp, 0, totalBytesRead); + } + } return new Integer(totalBytesRead); } }
