Repository: trafodion
Updated Branches:
  refs/heads/master d48a88741 -> e618aaf3d


[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text 
formatted hive tables


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/ac706607
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/ac706607
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/ac706607

Branch: refs/heads/master
Commit: ac7066074611a09da33adf88673c2f023e7dda7d
Parents: 75c7b39
Author: selvaganesang <[email protected]>
Authored: Wed Feb 28 19:27:40 2018 +0000
Committer: selvaganesang <[email protected]>
Committed: Wed Feb 28 21:57:59 2018 +0000

----------------------------------------------------------------------
 core/sql/cli/Context.h                          |  1 +
 core/sql/cli/Globals.cpp                        |  1 +
 core/sql/cli/Globals.h                          |  1 +
 core/sql/comexe/ComTdbHdfsScan.h                |  5 +-
 core/sql/common/ComRtUtils.cpp                  |  1 +
 core/sql/executor/ExExeUtilGet.cpp              |  2 -
 core/sql/executor/ExFastTransport.cpp           |  7 +--
 core/sql/executor/ExHdfsScan.cpp                | 58 ++++++++++++++++++--
 core/sql/executor/ExHdfsScan.h                  | 23 ++++++++
 core/sql/executor/HBaseClient_JNI.cpp           |  1 -
 core/sql/executor/HdfsClient_JNI.cpp            | 28 +++++-----
 core/sql/exp/ExpLOBaccess.cpp                   |  1 +
 core/sql/optimizer/HDFSHook.cpp                 |  1 +
 core/sql/optimizer/NATable.cpp                  |  1 +
 core/sql/sqlcomp/DefaultConstants.h             |  1 +
 .../main/java/org/trafodion/sql/HDFSClient.java | 13 +++--
 16 files changed, 112 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/cli/Context.h
----------------------------------------------------------------------
diff --git a/core/sql/cli/Context.h b/core/sql/cli/Context.h
index 973a647..bcfd06b 100644
--- a/core/sql/cli/Context.h
+++ b/core/sql/cli/Context.h
@@ -512,6 +512,7 @@ private:
   NAString jniErrorStr_; 
   HBaseClient_JNI *hbaseClientJNI_;
   HiveClient_JNI *hiveClientJNI_;
+  HdfsClient *hdfsClientJNI_;
 
   // this points to data used by trafSE (traf storage engine) that is context 
specific.
   // It points to a list of 'black box' of data allocated by user and is 
returned

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/cli/Globals.cpp
----------------------------------------------------------------------
diff --git a/core/sql/cli/Globals.cpp b/core/sql/cli/Globals.cpp
index ed30502..fb09008 100644
--- a/core/sql/cli/Globals.cpp
+++ b/core/sql/cli/Globals.cpp
@@ -60,6 +60,7 @@
 #include <semaphore.h>
 #include <pthread.h>
 #include "HBaseClient_JNI.h"
+#include "HdfsClient_JNI.h"
 #include "LmLangManagerC.h"
 #include "LmLangManagerJava.h"
 #include "CliSemaphore.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/cli/Globals.h
----------------------------------------------------------------------
diff --git a/core/sql/cli/Globals.h b/core/sql/cli/Globals.h
index 284d992..b8e04ea 100644
--- a/core/sql/cli/Globals.h
+++ b/core/sql/cli/Globals.h
@@ -88,6 +88,7 @@ class CliGlobals;
 class CLISemaphore;
 class HBaseClient_JNI;
 class HiveClient_JNI;
+class HdfsClient;
 class TransMode;
 class ContextTidMap;
 class LmLanguageManager;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/comexe/ComTdbHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h
index 86534be..46d7f2f 100755
--- a/core/sql/comexe/ComTdbHdfsScan.h
+++ b/core/sql/comexe/ComTdbHdfsScan.h
@@ -54,7 +54,8 @@ class ComTdbHdfsScan : public ComTdb
     CONTINUE_ON_ERROR           = 0x0020,
     LOG_ERROR_ROWS              = 0x0040,
     ASSIGN_RANGES_AT_RUNTIME    = 0x0080,
-    USE_LIBHDFS_SCAN            = 0x0100
+    TREAT_EMPTY_AS_NULL         = 0x0100,
+    USE_LIBHDFS_SCAN            = 0x0200
   };
 
   // Expression to filter rows.
@@ -288,7 +289,7 @@ public:
   {(v ? flags_ |= USE_LIBHDFS_SCAN : flags_ &= ~USE_LIBHDFS_SCAN); }
   NABoolean getUseLibhdfsScan() const
                                 { return (flags_ & USE_LIBHDFS_SCAN) != 0; }
-  
+
   UInt32 getMaxErrorRows() const { return maxErrorRows_;}
   void setMaxErrorRows(UInt32 v ) { maxErrorRows_= v; }
   

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/common/ComRtUtils.cpp
----------------------------------------------------------------------
diff --git a/core/sql/common/ComRtUtils.cpp b/core/sql/common/ComRtUtils.cpp
index f2619f9..35f9ca7 100644
--- a/core/sql/common/ComRtUtils.cpp
+++ b/core/sql/common/ComRtUtils.cpp
@@ -81,6 +81,7 @@
 #include "seabed/ms.h"
 #include "seabed/fs.h"
 
+#include "HdfsClient_JNI.h"
 struct ModName {
 public:
   const char * name;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExExeUtilGet.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilGet.cpp 
b/core/sql/executor/ExExeUtilGet.cpp
index 539a8cf..cd20fae 100644
--- a/core/sql/executor/ExExeUtilGet.cpp
+++ b/core/sql/executor/ExExeUtilGet.cpp
@@ -6103,7 +6103,6 @@ ExExeUtilRegionStatsTcb::ExExeUtilRegionStatsTcb(
   ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
                                        (char*)"", //exe_util_tdb.server(), 
                                        (char*)""); //exe_util_tdb.zkPort(),
-
   regionInfoList_ = NULL;
   
   tableName_ = new(glob->getDefaultHeap()) char[2000];
@@ -6876,7 +6875,6 @@ ExExeUtilClusterStatsTcb::ExExeUtilClusterStatsTcb(
   ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
                                        (char*)"", //exe_util_tdb.server(), 
                                        (char*)""); //exe_util_tdb.zkPort());
-
   regionInfoList_ = NULL;
   
   // get hbase rootdir location. Max linux pathlength is 1024.

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExFastTransport.cpp 
b/core/sql/executor/ExFastTransport.cpp
index 9d43545..ba4cfbb 100644
--- a/core/sql/executor/ExFastTransport.cpp
+++ b/core/sql/executor/ExFastTransport.cpp
@@ -827,12 +827,11 @@ ExWorkProcRetcode ExHdfsFastExtractTcb::work()
           }
         }
       else
-        {
+      {
           updateWorkATPDiagsArea(__FILE__,__LINE__,"sockets are not 
supported");
           pstate.step_ = EXTRACT_ERROR;
           break;
-        }
-
+      }
       for (UInt32 i = 0; i < myTdb().getChildTuple()->numAttrs(); i++)
       {
         Attributes * attr = myTdb().getChildTableAttr(i);
@@ -1290,7 +1289,7 @@ void 
ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 hdfsClientRetCode)
                   (ExeErrorCode)(8447),
                   NULL, NULL, NULL, NULL,
                   errorMsg,
-                (char *)currContext->getJniErrorStr().data());
+                  (char *)currContext->getJniErrorStr().data());
   updateWorkATPDiagsArea(diagsArea);
 }
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index 2b73feb..3ff153e 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -317,7 +317,6 @@ void ExHdfsScanTcb::freeResources()
   if (hdfsScan_ != NULL) 
      NADELETE(hdfsScan_, HdfsScan, getHeap());
 }
-
 NABoolean ExHdfsScanTcb::needStatsEntry()
 {
   // stats are collected for ALL and OPERATOR options.
@@ -1099,15 +1098,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 if ((BYTE *)startOfNextRow > bufLogicalEnd_) {
                    step_ = TRAF_HDFS_READ;
                    hdfsBufNextRow_ = NULL;
-                  if (!exception_)
-                      break;
                 }
                 else
                  hdfsBufNextRow_ = startOfNextRow;
-             }
+            }
            
          }
-
+           
          if (exception_)
          {
            nextStep_ = step_;
@@ -2019,6 +2016,54 @@ void ExHdfsScanTcb::deallocateRuntimeRanges()
     }
 }
 
+void ExHdfsScanTcb::handleException(NAHeap *heap,
+                                    char *logErrorRow,
+                                    Lng32 logErrorRowLen,
+                                    ComCondition *errorCond)
+{
+  Lng32 errorMsgLen = 0;
+  charBuf *cBuf = NULL;
+  char *errorMsg;
+  HDFS_Client_RetCode hdfsClientRetcode;
+
+  if (loggingErrorDiags_ != NULL)
+     return;
+
+  if (!loggingFileCreated_) {
+     hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), 
hdfsClientRetcode);
+     if (hdfsClientRetcode == HDFS_CLIENT_OK)
+        hdfsClientRetcode = hdfsClient_->hdfsCreate(loggingFileName_, FALSE);
+     if (hdfsClientRetcode == HDFS_CLIENT_OK)
+        loggingFileCreated_ = TRUE;
+     else 
+        goto logErrorReturn;
+  }
+  hdfsClientRetcode = hdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen);
+  if (hdfsClientRetcode != HDFS_CLIENT_OK) 
+     goto logErrorReturn;
+  if (errorCond != NULL) {
+     errorMsgLen = errorCond->getMessageLength();
+     const NAWcharBuf wBuf((NAWchar*)errorCond->getMessageText(), errorMsgLen, 
heap);
+     cBuf = unicodeToISO88591(wBuf, heap, cBuf);
+     errorMsg = (char *)cBuf->data();
+     errorMsgLen = cBuf -> getStrLen();
+     errorMsg[errorMsgLen]='\n';
+     errorMsgLen++;
+  }
+  else {
+     errorMsg = (char *)"[UNKNOWN EXCEPTION]\n";
+     errorMsgLen = strlen(errorMsg);
+  }
+  hdfsClientRetcode = hdfsClient_->hdfsWrite(errorMsg, errorMsgLen);
+logErrorReturn:
+  if (hdfsClientRetcode != HDFS_CLIENT_OK) {
+     loggingErrorDiags_ = ComDiagsArea::allocate(heap);
+     *loggingErrorDiags_ << DgSqlCode(EXE_ERROR_WHILE_LOGGING)
+                 << DgString0(loggingFileName_)
+                 << DgString1((char 
*)GetCliGlobals()->currContext()->getJniErrorStr().data());
+  }
+}
+
 short ExHdfsScanTcb::moveRowToUpQueue(const char * row, Lng32 len, 
                                       short * rc, NABoolean isVarchar)
 {
@@ -2119,7 +2164,7 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode &rc)
 
   return 0;
 }
-
+/*
 void ExHdfsScanTcb::handleException(NAHeap *heap,
                                     char *logErrorRow,
                                     Lng32 logErrorRowLen,
@@ -2167,6 +2212,7 @@ logErrorReturn:
                  << DgString1((char 
*)GetCliGlobals()->currContext()->getJniErrorStr().data());
   }
 }
+*/
 
 ////////////////////////////////////////////////////////////////////////
 // ORC files

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h
index f4ad7e1..04b632e 100644
--- a/core/sql/executor/ExHdfsScan.h
+++ b/core/sql/executor/ExHdfsScan.h
@@ -135,6 +135,29 @@ class ExHdfsScanTcb  : public ex_tcb
    
 public:
   enum
+/*
+   USE_LIBHDFS_SCAN - OFF enables hdfs access via java classes 
+      org.trafodion.sql.HdfsScan and org.trafodion.sql.HdfsClient
+   Steps involved:
+   1. Create a new HdfsScan object and set the scan ranges of the fragment 
instance in it
+      The scan range involves the following and it is determined either at 
runtime or compile time
+         a) filename
+         b) offset
+         c) len
+      Java layer always reads more than the len by rangeTailIOSize_ to 
accommdate the record split 
+   2. Two ByteBuffer objects are also passsed to HdfsScan object. These 
ByteBuffers are backed up by
+      2 native buffers where the data is fetched. The buffer has a head room 
of size rangeTailIOSize_ and the 
+      data is always read after the head room. 
+   3. HdfsScan returns an int array containing bytesRead, bufNo, rangeNo, 
isEOF and schedules either
+      the remaining bytes to be read or the next range using ByteBuffers 
alternatively.
+   4. HdfsScan returns null array when there is no more data to be read.
+   5. When the data is processed in one ByteBuffer in the native thread, the 
data is fetched into the other ByteBuffer by
+      another Java thread.
+   6. Native layer after processing all the rows in one ByteBuffer, moves the 
last incomplete row to head room of the
+      other ByteBuffer. Then it requests to check if the read is complete. The 
native layer processes the buffer starting
+      from the copied incomplete row.
+*/
+
   {
     BYTES_COMPLETED,
     BUF_NO,

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/HBaseClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HBaseClient_JNI.cpp 
b/core/sql/executor/HBaseClient_JNI.cpp
index 6b400cd..8d12821 100644
--- a/core/sql/executor/HBaseClient_JNI.cpp
+++ b/core/sql/executor/HBaseClient_JNI.cpp
@@ -5166,4 +5166,3 @@ void deleteNAArray(CollHeap *heap, NAArray<HbaseStr> 
*array)
   }
   NADELETE(array, NAArray, heap);
 }
-                      

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp 
b/core/sql/executor/HdfsClient_JNI.cpp
index f08aa92..5f0f810 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -23,6 +23,7 @@
 
 #include "QRLogger.h"
 #include "Globals.h"
+#include "Context.h"
 #include "jni.h"
 #include "HdfsClient_JNI.h"
 
@@ -239,6 +240,7 @@ HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, 
ExHdfsScanStats *hdfsStat
    }
    if (j_retArray == NULL)
       return HDFS_SCAN_EOR;
+
    short retArrayLen = jenv_->GetArrayLength(j_retArray);
    ex_assert(retArrayLen == arrayLen, "HdfsScan::trafHdfsRead() InternalError: 
retArrayLen != arrayLen");
    jenv_->GetIntArrayRegion(j_retArray, 0, 4, retArray);
@@ -285,23 +287,11 @@ HdfsClient::~HdfsClient()
    deleteHdfsFileInfo();
 }
 
-void HdfsClient::deleteHdfsFileInfo()
-{
-   for (int i = 0; i < numFiles_ ; i ++) {
-      NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap());
-      NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap());
-      NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap());
-   }
-   NADELETEBASIC(hdfsFileInfo_, getHeap()); 
-   numFiles_ = 0;
-   hdfsFileInfo_ = NULL;
-}
-
 HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called.");
 
-   if (initJNIEnv() != JOI_OK)
+      if (initJNIEnv() != JOI_OK)
      return NULL;
    retCode = HDFS_CLIENT_OK;
    HdfsClient *hdfsClient = new (heap) HdfsClient(heap);
@@ -315,6 +305,18 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, 
HDFS_Client_RetCode &retCode)
    return hdfsClient;
 }
 
+void HdfsClient::deleteHdfsFileInfo()
+{
+   for (int i = 0; i < numFiles_ ; i ++) {
+      NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap());
+      NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap());
+      NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap());
+   }
+   NADELETEBASIC(hdfsFileInfo_, getHeap()); 
+   numFiles_ = 0;
+   hdfsFileInfo_ = NULL;
+}
+
 HDFS_Client_RetCode HdfsClient::init()
 {
   static char className[]="org/trafodion/sql/HDFSClient";

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/exp/ExpLOBaccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp
index b5a427b..481e960 100644
--- a/core/sql/exp/ExpLOBaccess.cpp
+++ b/core/sql/exp/ExpLOBaccess.cpp
@@ -63,6 +63,7 @@
 #include "ComQueue.h"
 #include "QRLogger.h"
 #include "NAMemory.h"
+#include "HdfsClient_JNI.h"
 #include <seabed/ms.h>
 #include <seabed/fserr.h>
 #include <curl/curl.h>

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/optimizer/HDFSHook.cpp
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/HDFSHook.cpp b/core/sql/optimizer/HDFSHook.cpp
index dfc80ad..c8793d9 100644
--- a/core/sql/optimizer/HDFSHook.cpp
+++ b/core/sql/optimizer/HDFSHook.cpp
@@ -29,6 +29,7 @@
 
 // for DNS name resolution
 #include <netdb.h>
+#include "HdfsClient_JNI.h"
 #include "Globals.h"
 #include "Context.h"
 // Initialize static variables

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/optimizer/NATable.cpp
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/NATable.cpp b/core/sql/optimizer/NATable.cpp
index d657a48..cb0cc39 100644
--- a/core/sql/optimizer/NATable.cpp
+++ b/core/sql/optimizer/NATable.cpp
@@ -87,6 +87,7 @@
 #define MAX_NODE_NAME 9
 
 #include "SqlParserGlobals.h"
+#include "HdfsClient_JNI.h"
 
 //#define __ROSETTA
 //#include "rosetta_ddl_include.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h 
b/core/sql/sqlcomp/DefaultConstants.h
index 2671482..fd110de 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3310,6 +3310,7 @@ enum DefaultConstants
 
   // Use the earlier implementation of HdfsScan via libhdfs
   USE_LIBHDFS_SCAN,
+  
   // This enum constant must be the LAST one in the list; it's a count,
   // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
   __NUM_DEFAULT_ATTRIBUTES

http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index fe116d7..52453cc 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -21,19 +21,20 @@
 
 package org.trafodion.sql;
 
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.Logger;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.conf.Configuration;
-import java.nio.ByteBuffer;
-import java.io.IOException;
 import java.io.EOFException;
-import java.io.OutputStream;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.Executors;
@@ -168,7 +169,7 @@ public class HDFSClient
       bytesRead = retObject.intValue();
       fsdis_.close();
       return bytesRead;
-   }
+   }  
 
    public int getRangeNo()
    {
@@ -392,3 +393,5 @@ public class HDFSClient
                         short permissions, long accessTime);
 
 }
+
+

Reply via email to