Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 6d2213d04 -> 1efa9dbdd


[TRAFODION-2821] Trafodion core code base needs to be thread safe

The method Ids of the different Java classes are now initialized
in a thread safe manner in JNI layer.

The missing code to initialize the thread specific JNIEnv are added.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/31041e0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/31041e0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/31041e0f

Branch: refs/heads/master
Commit: 31041e0f7630cea22d866e53fe6e8c7a4d046974
Parents: 19c7544
Author: selvaganesang <[email protected]>
Authored: Tue Nov 28 13:45:41 2017 +0000
Committer: selvaganesang <[email protected]>
Committed: Tue Nov 28 13:45:41 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/OrcFileReader.cpp      | 222 +++++++++++++++++++-------
 core/sql/executor/OrcFileReader.h        |   4 +
 core/sql/executor/SequenceFileReader.cpp | 113 +++++++------
 3 files changed, 235 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.cpp 
b/core/sql/executor/OrcFileReader.cpp
index 7696589..d63052d 100644
--- a/core/sql/executor/OrcFileReader.cpp
+++ b/core/sql/executor/OrcFileReader.cpp
@@ -30,6 +30,8 @@
 
 JavaMethodInit* OrcFileReader::JavaMethods_ = NULL;
 jclass OrcFileReader::javaClass_ = 0;
+bool OrcFileReader::javaMethodsInitialized_ = false;
+pthread_mutex_t OrcFileReader::javaMethodsInitMutex_ = 
PTHREAD_MUTEX_INITIALIZER;
 
 static const char* const sfrErrorEnumStr[] = 
 {
@@ -43,8 +45,10 @@ static const char* const sfrErrorEnumStr[] =
  ,"Java exception in isEOF()"
  ,"Java exception in fetchNextRow()"
  ,"Java exception in close()"
+ ,"Java exception in getRowNum()"
 };
 
+
 //////////////////////////////////////////////////////////////////////////////
 // 
 //////////////////////////////////////////////////////////////////////////////
@@ -70,11 +74,29 @@ OrcFileReader::~OrcFileReader()
 OFR_RetCode OrcFileReader::init()
 {
   static char className[]="org/trafodion/sql/OrcFileReader";
-  
-  if (JavaMethods_)
-    return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, 
JavaMethods_, (Int32)JM_LAST, TRUE);       
-  else
+
+  OFR_RetCode lv_retcode = OFR_OK;
+ 
+  QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
+               LL_DEBUG,
+               "Enter OrcFileReader::init()");
+
+  if (isInitialized())
+    return lv_retcode;
+
+  if (javaMethodsInitialized_)
+    return (OFR_RetCode)JavaObjectInterface::init(className, 
+                                                       javaClass_, 
+                                                       JavaMethods_, 
+                                                        (Int32)JM_LAST, 
javaMethodsInitialized_);
+  else 
   {
+    pthread_mutex_lock(&javaMethodsInitMutex_);
+    if (javaMethodsInitialized_)
+    {
+      pthread_mutex_unlock(&javaMethodsInitMutex_);
+      return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, 
JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
+    }
     JavaMethods_ = new JavaMethodInit[JM_LAST];
     
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
@@ -111,9 +133,15 @@ OFR_RetCode OrcFileReader::init()
 //    JavaMethods_[JM_FETCHBUFF2].jm_signature = "(II)[Ljava/lang/String;";
     JavaMethods_[JM_CLOSE     ].jm_name      = "close";
     JavaMethods_[JM_CLOSE     ].jm_signature = "()Ljava/lang/String;";
-   
-    return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, 
JavaMethods_, (Int32)JM_LAST, FALSE);
+    
+    lv_retcode = (OFR_RetCode)JavaObjectInterface::init(className,
+                                                       javaClass_,
+                                                       JavaMethods_,
+                                                       (Int32)JM_LAST, 
javaMethodsInitialized_);
+    javaMethodsInitialized_ = TRUE;
+    pthread_mutex_unlock(&javaMethodsInitMutex_);
   }
+  return lv_retcode;
 }
        
 //////////////////////////////////////////////////////////////////////////////
@@ -122,24 +150,28 @@ OFR_RetCode OrcFileReader::init()
 OFR_RetCode OrcFileReader::open(const char* path)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, 
"OrcFileReader::open(%s) called.", path);
+
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_OPEN_PARAM;
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) 
+  {
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_OPEN_PARAM;
-
+  }
   // String open(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_OPEN].methodID, js_path);
 
-  jenv_->DeleteLocalRef(js_path);  
-
-  if (jresult != NULL)
+  if (jenv_->ExceptionCheck())
   {
-       const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE);
-       printf("open error: %s\n", my_string);
+    getExceptionDetails();
     logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::open()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_OPEN_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -149,18 +181,32 @@ OFR_RetCode OrcFileReader::open(const char* path)
 OFR_RetCode OrcFileReader::getPosition(Int64& pos)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, 
"OrcFileReader::getPosition(%ld) called.", pos);
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_GETPOS_EXCEPTION;
 
   // long getPosition();
   tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
   Int64 result = jenv_->CallLongMethod(javaObj_, 
JavaMethods_[JM_GETPOS].methodID);
 
-  if (result == -1) 
+  if (jenv_->ExceptionCheck())
   {
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
     logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::getPosition()", 
getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_GETPOS_EXCEPTION;
+  }
+
+  if (result == -1) {
+    logError(CAT_SQL_HDFS_ORC_FILE_READER,
+            "OrcFileReader::getPosition()",
+            getLastError());
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_GETPOS_EXCEPTION;
   }
 
   pos = result;
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -169,23 +215,41 @@ OFR_RetCode OrcFileReader::getPosition(Int64& pos)
 //////////////////////////////////////////////////////////////////////////////
 OFR_RetCode OrcFileReader::seeknSync(Int64 pos)
 {
-       Int64 orcPos;
+  Int64 orcPos;
+
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_SYNC_EXCEPTION;
        
-       orcPos = pos -1;        //When you position in ORC, reading the NEXT 
row will be one greater than what you wanted.
+  orcPos = pos -1;     //When you position in ORC, reading the NEXT row will 
be one greater than what you wanted.
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, 
"OrcFileReader::seeknSync(%ld) called.", pos);
 
   // String seeknSync(long);
   tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_SYNC].methodID, orcPos);
 
-  if (jresult != NULL)
+  if (jenv_->ExceptionCheck())
   {
-       const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE);
-       printf("seeknSync error: %s\n", my_string);
-    logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::seeknSync()", 
jresult);
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_SYNC_EXCEPTION;
   }
-  
+
+  if (jresult != NULL) {
+    const char *my_string = jenv_->GetStringUTFChars(jresult,
+                                                    JNI_FALSE);
+    QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
+                 LL_DEBUG,
+                 "OrcFileReader::seeknSync(%ld) error: %s\n",
+                 pos,
+                 my_string);
+    logError(CAT_SQL_HDFS_ORC_FILE_READER,
+            "OrcFileReader::seeknSync()",
+            jresult);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_SYNC_EXCEPTION;
+  }
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -196,11 +260,22 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, 
"OrcFileReader::isEOF() called.");
 
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_ISEOF_EXCEPTION;
   // boolean isEOF();
   tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
-  bool result = jenv_->CallBooleanMethod(javaObj_, 
JavaMethods_[JM_ISEOF].methodID);
+  bool result = jenv_->CallBooleanMethod(javaObj_,
+                                        JavaMethods_[JM_ISEOF].methodID);
 
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_ISEOF_EXCEPTION;
+  }
   isEOF = result;
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -211,11 +286,14 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
 OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, 
long& rowNumber, int& num_columns)
 {
 /*
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_FETCHROW_EXCEPTION;
   // java.lang.String fetchNextRow(long stopOffset);
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
   if (jresult==NULL && getLastError()) 
   {
     logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", 
getLastError());
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_FETCHROW_EXCEPTION;
   }
 
@@ -227,7 +305,7 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, 
long& array_length, long&
   const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
   strcpy(buffer, char_result);
   jenv_->ReleaseStringUTFChars(jresult, char_result);
-  jenv_->DeleteLocalRef(jresult);  
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 */
 
@@ -236,58 +314,71 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, 
long& array_length, long&
 
         tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
        jobject jresult = (jobject)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_FETCHROW2].methodID);
+    if (jenv_->ExceptionCheck()) 
+    {
+      getExceptionDetails();
+      logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+      logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", 
getLastError());
+      jenv_->PopLocalFrame(NULL);
+      return OFR_ERROR_FETCHROW_EXCEPTION;
+    }
        if (jresult==NULL && getLastError()) 
        {
-                 logError(CAT_SQL_HDFS_ORC_FILE_READER, 
"OrcFileReader::fetchNextRow()", getLastError());
-                 return OFR_ERROR_FETCHROW_EXCEPTION;
+         logError(CAT_SQL_HDFS_ORC_FILE_READER, 
"OrcFileReader::fetchNextRow()", getLastError());
+         return OFR_ERROR_FETCHROW_EXCEPTION;
        }
 
        if (jresult == NULL)
-               return (OFR_NOMORE);            //No more rows
+          return (OFR_NOMORE);         //No more rows
 
 //Retrieve row and associated data             
        jclass cls = jenv_->GetObjectClass(jresult);
        
        fid = jenv_->GetFieldID(cls,"m_row_length","I");
        if (fid ==NULL)
-               {
-                       return (OFR_ERROR_FETCHROW_EXCEPTION);
-               }               
+       {
+           jenv_->PopLocalFrame(NULL);
+          return (OFR_ERROR_FETCHROW_EXCEPTION);
+       }               
        jint row_length = (jint)jenv_->GetIntField(jresult, fid);
        array_length = (long)row_length;
 
-       
        fid = jenv_->GetFieldID(cls,"m_column_count","I");
        if (fid ==NULL)
-               {
-                       return(OFR_ERROR_FETCHROW_EXCEPTION);
-               }
+       {
+           jenv_->PopLocalFrame(NULL);
+           return(OFR_ERROR_FETCHROW_EXCEPTION);
+       }
        jint column_count = (jint)jenv_->GetIntField(jresult, fid);
        num_columns = column_count;
 
        fid = jenv_->GetFieldID(cls,"m_row_number","J");
        if (fid ==NULL)
-               {
-                       return(OFR_ERROR_FETCHROW_EXCEPTION);
-               }
+       {
+           jenv_->PopLocalFrame(NULL);
+          return(OFR_ERROR_FETCHROW_EXCEPTION);
+       }
        jlong rowNum = (jlong)jenv_->GetIntField(jresult, fid);
        rowNumber = rowNum;
-
-       
 // Get the actual row (it is a byte array). Use the row_length above to 
specify how much to copy       
        fid = jenv_->GetFieldID(cls,"m_row_ba","[B");
        if (fid ==NULL)
-               {
-                       return (OFR_ERROR_FETCHROW_EXCEPTION);
-               }
+       {
+           jenv_->PopLocalFrame(NULL);
+          return (OFR_ERROR_FETCHROW_EXCEPTION);
+       }
        jbyteArray jrow = (jbyteArray)jenv_->GetObjectField(jresult, fid);
 
-  if (jrow == NULL)
-               return (OFR_ERROR_FETCHROW_EXCEPTION);
+        if (jrow == NULL)
+        {
+           jenv_->PopLocalFrame(NULL);
+           return (OFR_ERROR_FETCHROW_EXCEPTION);
+        }
 
-               jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
-               jenv_->DeleteLocalRef(jrow);  
+       jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
+       jenv_->DeleteLocalRef(jrow);  
 
+  jenv_->PopLocalFrame(NULL);
   return (OFR_OK);
 }
 
@@ -297,22 +388,31 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, 
long& array_length, long&
 OFR_RetCode OrcFileReader::close()
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, 
"OrcFileReader::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return OFR_OK;
-  }
+
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_GETPOS_EXCEPTION;
     
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_CLOSE].methodID);
 
-  if (jresult!=NULL) 
+  if (jenv_->ExceptionCheck()) 
   {
-    logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::close()", jresult);
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_CLOSE_EXCEPTION;
+  }
+
+  if (jresult!=NULL) {
+    logError(CAT_SQL_HDFS_ORC_FILE_READER,
+            "OrcFileReader::close()",
+            jresult);
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_CLOSE_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -322,16 +422,22 @@ OFR_RetCode OrcFileReader::close()
 OFR_RetCode OrcFileReader::getRowCount(Int64& count)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, 
"OrcFileReader::getRowCount() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return OFR_OK;
-  }
-    
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_GETNUMROWS_EXCEPTION;
+
   tsRecentJMFromJNI = JavaMethods_[JM_GETNUMROWS].jm_full_name;
   jlong jresult = (jlong)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_GETNUMROWS].methodID);
   count = jresult;
   
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_GETNUMROWS_EXCEPTION;
+  }
+
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -388,5 +494,5 @@ Removed until implemented
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "  =>Returning %d, 
read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
   return retCode;
 */
-       return (OFR_NOMORE);
+  return OFR_OK;
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.h 
b/core/sql/executor/OrcFileReader.h
index c9c491b..925456b 100644
--- a/core/sql/executor/OrcFileReader.h
+++ b/core/sql/executor/OrcFileReader.h
@@ -42,6 +42,7 @@ typedef enum {
  ,OFR_ERROR_ISEOF_EXCEPTION     // Java exception in isEOF()
  ,OFR_ERROR_FETCHROW_EXCEPTION  // Java exception in fetchNextRow()
  ,OFR_ERROR_CLOSE_EXCEPTION     // Java exception in close()
+ ,OFR_ERROR_GETNUMROWS_EXCEPTION
  ,OFR_LAST
 } OFR_RetCode;
 
@@ -115,6 +116,9 @@ private:
  
   static jclass javaClass_;
   static JavaMethodInit* JavaMethods_;
+  static bool javaMethodsInitialized_;
+  // this mutex protects both JaveMethods_ and javaClass_ initialization
+  static pthread_mutex_t javaMethodsInitMutex_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/SequenceFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.cpp 
b/core/sql/executor/SequenceFileReader.cpp
index 0ef0399..a67db87 100644
--- a/core/sql/executor/SequenceFileReader.cpp
+++ b/core/sql/executor/SequenceFileReader.cpp
@@ -176,9 +176,12 @@ SFR_RetCode SequenceFileReader::init()
 SFR_RetCode SequenceFileReader::open(const char* path)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, 
"SequenceFileReader::open(%s) called.", path);
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_OPEN_PARAM;
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) 
-    return SFR_ERROR_OPEN_PARAM;
+     jenv_->PopLocalFrame(NULL);
+     return SFR_ERROR_OPEN_PARAM;
 
   // String open(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
@@ -189,9 +192,10 @@ SFR_RetCode SequenceFileReader::open(const char* path)
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", 
jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_OPEN_EXCEPTION;
   }
-  
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -202,6 +206,9 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, 
"SequenceFileReader::getPosition(%ld) called.", pos);
 
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_GETPOS_EXCEPTION;
+
   // long getPosition();
   tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
   Int64 result = jenv_->CallLongMethod(javaObj_, 
JavaMethods_[JM_GETPOS].methodID);
@@ -209,10 +216,12 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
   if (result == -1) 
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, 
"SequenceFileReader::getPosition()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_GETPOS_EXCEPTION;
   }
 
   pos = result;
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -223,6 +232,9 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, 
"SequenceFileReader::seeknSync(%ld) called.", pos);
 
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_GETPOS_EXCEPTION;
+
   // String seeknSync(long);
   tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_SYNC].methodID, pos);
@@ -230,9 +242,11 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::seeknSync()", 
jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_SYNC_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -243,10 +257,13 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, 
"SequenceFileReader::isEOF() called.");
 
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_ISEOF_EXCEPTION;
   // boolean isEOF();
   tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
   bool result = jenv_->CallBooleanMethod(javaObj_, 
JavaMethods_[JM_ISEOF].methodID);
 
+  jenv_->PopLocalFrame(NULL);
   isEOF = result;
   return SFR_OK;
 }
@@ -279,24 +296,29 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
 //////////////////////////////////////////////////////////////////////////////
 SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
 {
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_FETCHROW_EXCEPTION;
+
   // java.lang.String fetchNextRow(long stopOffset);
   tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
   if (jresult==NULL && getLastError()) 
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, 
"SequenceFileReader::fetchNextRow()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_FETCHROW_EXCEPTION;
   }
 
   if (jresult == NULL)
   {
+    jenv_->PopLocalFrame(NULL);
     return SFR_NOMORE;
   }
   
   const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
   strcpy(buffer, char_result);
   jenv_->ReleaseStringUTFChars(jresult, char_result);
-  jenv_->DeleteLocalRef(jresult);  
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -306,11 +328,9 @@ SFR_RetCode SequenceFileReader::fetchNextRow(Int64 
stopOffset, char* buffer)
 SFR_RetCode SequenceFileReader::close()
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, 
"SequenceFileReader::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return SFR_OK;
-  }
+
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_CLOSE_EXCEPTION;
     
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
@@ -319,9 +339,11 @@ SFR_RetCode SequenceFileReader::close()
   if (jresult!=NULL) 
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::close()", 
jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_CLOSE_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -535,22 +557,26 @@ SFW_RetCode SequenceFileWriter::init()
 SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType 
compression)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, 
"SequenceFileWriter::open(%s) called.", path);
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_OPEN_PARAM;
   jstring js_path = jenv_->NewStringUTF(path);
-  if (js_path == NULL) 
-    return SFW_ERROR_OPEN_PARAM;
+  if (js_path == NULL) {
+     jenv_->PopLocalFrame(NULL);
+     return SFW_ERROR_OPEN_PARAM;
+  }
 
   // String open(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_OPEN].methodID, js_path, compression);
 
-  jenv_->DeleteLocalRef(js_path);  
-
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::open()", 
jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_OPEN_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
@@ -560,9 +586,14 @@ SFW_RetCode SequenceFileWriter::open(const char* path, 
SFW_CompType compression)
 SFW_RetCode SequenceFileWriter::write(const char* data)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, 
"SequenceFileWriter::write(%s) called.", data);
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_WRITE_PARAM;
+
   jstring js_data = jenv_->NewStringUTF(data);
-  if (js_data == NULL) 
+  if (js_data == NULL) {
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_WRITE_PARAM;
+  }
 
   // String write(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name;
@@ -573,9 +604,11 @@ SFW_RetCode SequenceFileWriter::write(const char* data)
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", 
jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_WRITE_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
@@ -616,12 +649,10 @@ SFW_RetCode SequenceFileWriter::writeBuffer(char* data, 
Int64 buffSize, const ch
 SFW_RetCode SequenceFileWriter::close()
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, 
"SequenceFileWriter::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return SFW_OK;
-  }
     
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_CLOSE_EXCEPTION;
+
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, 
JavaMethods_[JM_CLOSE].methodID);
@@ -629,10 +660,11 @@ SFW_RetCode SequenceFileWriter::close()
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::close()", 
jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_CLOSE_EXCEPTION;
   }
   
-  
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
@@ -644,10 +676,8 @@ SFW_RetCode SequenceFileWriter::hdfsCreate(const char* 
path, NABoolean compress)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, 
"SequenceFileWriter::hdfsCreate(%s) called.", path);
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
-     return SFW_ERROR_HDFS_WRITE_EXCEPTION;
-  }
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_HDFS_CREATE_PARAM;
 
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) {
@@ -688,10 +718,8 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* 
data, Int64 len)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, 
"SequenceFileWriter::hdfsWrite(%ld) called.", len);
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_WRITE_EXCEPTION;
-  }
 
   //Write the requisite bytes into the file
   jbyteArray jbArray = jenv_->NewByteArray( len);
@@ -733,11 +761,9 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* 
data, Int64 len)
 SFW_RetCode SequenceFileWriter::hdfsClose()
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, 
"SequenceFileWriter::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return SFW_OK;
-  }
+
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
 
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name;
@@ -748,27 +774,27 @@ SFW_RetCode SequenceFileWriter::hdfsClose()
     getExceptionDetails();
     logError(CAT_SQL_HBASE, __FILE__, __LINE__);
     logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
   }
 
   if (jresult == false)
   {
     logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
   }
 
-
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
 SFW_RetCode SequenceFileWriter::hdfsCleanUnloadPath( const NAString& uldPath)
 {
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, 
"SequenceFileWriter::hdfsCleanUnloadPath(%s) called.",
-                                                      uldPath.data());
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
-     return SFW_ERROR_HDFS_CLEANUP_EXCEPTION;
-  }
+                                                                             
uldPath.data());
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_HDFS_CLEANUP_PARAM;
 
   jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
   if (js_UldPath == NULL) {
@@ -799,10 +825,9 @@ SFW_RetCode SequenceFileWriter::hdfsMergeFiles( const 
NAString& srcPath,
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, 
"SequenceFileWriter::hdfsMergeFiles(%s, %s) called.",
                   srcPath.data(), dstPath.data());
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION;
-  }
+
   jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data());
 
   if (js_SrcPath == NULL) {
@@ -845,10 +870,8 @@ SFW_RetCode SequenceFileWriter::hdfsDeletePath( const 
NAString& delPath)
 {
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, 
"SequenceFileWriter::hdfsDeletePath(%s called.",
                   delPath.data());
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION;
-  }
 
   jstring js_delPath = jenv_->NewStringUTF(delPath.data());
   if (js_delPath == NULL) {
@@ -887,10 +910,8 @@ SFW_RetCode SequenceFileWriter::hdfsExists( const 
NAString& uldPath, NABoolean &
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsExists(%s) 
called.",
                                                       uldPath.data());
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_EXISTS_EXCEPTION;
-  }
 
   jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
   if (js_UldPath == NULL) {

Reply via email to