Repository: trafodion
Updated Branches:
  refs/heads/master f493a6573 -> 0ebd76e19


[TRAFODION-3126] Refactored HDFS client implementation should also support 
Alluxio file system

Alluxio doesn't support direct ByteBuffer access. Circumvented
this problem by using non-direct ByteBuffer to read hdfs files
when it belongs to Alluxio file system.

No need to change the default setting of USE_LIBHDFS for Alluxio to work.


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/2a6cfd1a
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/2a6cfd1a
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/2a6cfd1a

Branch: refs/heads/master
Commit: 2a6cfd1a24d05e1e243919897a11572e68c14d59
Parents: 82bfb1a
Author: selvaganesang <[email protected]>
Authored: Fri Jun 29 21:26:49 2018 +0000
Committer: selvaganesang <[email protected]>
Committed: Fri Jun 29 21:26:49 2018 +0000

----------------------------------------------------------------------
 core/sql/pom.xml                                |  6 +++
 core/sql/pom.xml.apache                         |  6 +++
 core/sql/pom.xml.hdp                            |  6 +++
 .../main/java/org/trafodion/sql/HDFSClient.java | 47 ++++++++++++++++----
 4 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/pom.xml
----------------------------------------------------------------------
diff --git a/core/sql/pom.xml b/core/sql/pom.xml
index cd025a1..7b143d9 100644
--- a/core/sql/pom.xml
+++ b/core/sql/pom.xml
@@ -84,6 +84,12 @@
       <artifactId>protobuf-java</artifactId>
       <version>2.5.0</version>
     </dependency>
+    <dependency>
+       <groupId>org.alluxio</groupId>
+       <artifactId>alluxio-core-client-runtime</artifactId>
+       <scope>compile</scope>
+       <version>1.7.1</version>
+    </dependency>
   </dependencies>
 
   <groupId>org.trafodion.sql</groupId>

http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/pom.xml.apache
----------------------------------------------------------------------
diff --git a/core/sql/pom.xml.apache b/core/sql/pom.xml.apache
index 938bbbb..a8afd33 100644
--- a/core/sql/pom.xml.apache
+++ b/core/sql/pom.xml.apache
@@ -119,6 +119,12 @@
       <artifactId>protobuf-java</artifactId>
       <version>2.5.0</version>
     </dependency>
+    <dependency>
+       <groupId>org.alluxio</groupId>
+       <artifactId>alluxio-core-client-runtime</artifactId>
+       <scope>compile</scope>
+       <version>1.7.1</version>
+    </dependency>
   </dependencies>
 
   <groupId>org.trafodion.sql</groupId>

http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/pom.xml.hdp
----------------------------------------------------------------------
diff --git a/core/sql/pom.xml.hdp b/core/sql/pom.xml.hdp
index 5b1216f..52bac2e 100644
--- a/core/sql/pom.xml.hdp
+++ b/core/sql/pom.xml.hdp
@@ -99,6 +99,12 @@
       <artifactId>protobuf-java</artifactId>
       <version>2.5.0</version>
     </dependency>
+    <dependency>
+       <groupId>org.alluxio</groupId>
+       <artifactId>alluxio-core-client-runtime</artifactId>
+       <scope>compile</scope>
+       <version>1.7.1</version>
+    </dependency>
   </dependencies>
 
   <groupId>org.trafodion.sql</groupId>

http://git-wip-us.apache.org/repos/asf/trafodion/blob/2a6cfd1a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 95316d5..d4a697f 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -87,6 +87,7 @@ public class HDFSClient
    private static ExecutorService executorService_ = null;
    private static FileSystem defaultFs_ = null;
    private static CompressionCodecFactory codecFactory_ = null;
+   private static boolean alluxioNotInstalled_ = false;
    private FileSystem fs_ = null;
    private int bufNo_;
    private int rangeNo_;
@@ -95,6 +96,7 @@ public class HDFSClient
    private OutputStream outStream_;
    private String filename_;
    private ByteBuffer buf_;
+   private ByteBuffer savedBuf_;
    private byte[] bufArray_;
    private int bufLen_;
    private int bufOffset_ = 0;
@@ -126,6 +128,16 @@ public class HDFSClient
       catch (IOException ioe) {
          throw new RuntimeException("Exception in HDFSClient static block", 
ioe);
       }
+      try {
+         boolean alluxioFs = defaultFs_ instanceof alluxio.hadoop.FileSystem;
+      }
+      catch (Throwable rte)
+      {
+         // Ignore the exception. It is not needed for alluxio to be installed
+         // for the methods of this class to work if 
+         // alluxio filesystem is NOT required
+         alluxioNotInstalled_ = true;
+      }
       codecFactory_ = new CompressionCodecFactory(config_); 
       System.loadLibrary("executor");
    }
@@ -142,21 +154,32 @@ public class HDFSClient
       HDFSRead() 
       {
       }
- 
+    
       public Object call() throws IOException 
       {
          int bytesRead;
          int totalBytesRead = 0;
          if (compressed_) {
             bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
-         } else 
-         if (! buf_.hasArray()) {
-            try {
-              fsdis_.seek(pos_);
-            } catch (EOFException e) {
-              isEOF_ = 1;
-              return new Integer(totalBytesRead);
-            } 
+         } 
+         else  {
+            // alluxio doesn't support direct ByteBuffer reads
+            // Hence, create a non-direct ByteBuffer, read into
+            // byteArray backing up this ByteBuffer and 
+            // then copy the data read to direct ByteBuffer for the 
+            // native layer to process the data
+            if ((! alluxioNotInstalled_) && fs_ instanceof 
alluxio.hadoop.FileSystem) {
+               savedBuf_ = buf_;
+               buf_ = ByteBuffer.allocate(savedBuf_.capacity());
+            }
+            if (! buf_.hasArray()) {
+               try {
+                  fsdis_.seek(pos_);
+               } catch (EOFException e) {
+                  isEOF_ = 1;
+                  return new Integer(totalBytesRead);
+               } 
+            }
          }
          do
          {
@@ -181,6 +204,12 @@ public class HDFSClient
             pos_ += bytesRead;
             lenRemain_ -= bytesRead;
          } while (lenRemain_ > 0); 
+         if ((! alluxioNotInstalled_) && fs_ instanceof 
alluxio.hadoop.FileSystem) {
+            if (totalBytesRead > 0) {
+               byte[] temp = buf_.array();
+               savedBuf_.put(temp, 0, totalBytesRead);
+            }
+         }
          return new Integer(totalBytesRead);
       }
     } 

Reply via email to