Repository: incubator-trafodion Updated Branches: refs/heads/master 6d2213d04 -> 1efa9dbdd
[TRAFODION-2821] Trafodion core code base needs to be thread safe The method Ids of the different Java classes are now initialized in a thread safe manner in JNI layer. The missing code to initialize the thread specific JNIEnv are added. Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/31041e0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/31041e0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/31041e0f Branch: refs/heads/master Commit: 31041e0f7630cea22d866e53fe6e8c7a4d046974 Parents: 19c7544 Author: selvaganesang <[email protected]> Authored: Tue Nov 28 13:45:41 2017 +0000 Committer: selvaganesang <[email protected]> Committed: Tue Nov 28 13:45:41 2017 +0000 ---------------------------------------------------------------------- core/sql/executor/OrcFileReader.cpp | 222 +++++++++++++++++++------- core/sql/executor/OrcFileReader.h | 4 + core/sql/executor/SequenceFileReader.cpp | 113 +++++++------ 3 files changed, 235 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp index 7696589..d63052d 100644 --- a/core/sql/executor/OrcFileReader.cpp +++ b/core/sql/executor/OrcFileReader.cpp @@ -30,6 +30,8 @@ JavaMethodInit* OrcFileReader::JavaMethods_ = NULL; jclass OrcFileReader::javaClass_ = 0; +bool OrcFileReader::javaMethodsInitialized_ = false; +pthread_mutex_t OrcFileReader::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER; static const char* const sfrErrorEnumStr[] = { @@ -43,8 +45,10 @@ static const char* const sfrErrorEnumStr[] = ,"Java exception in isEOF()" ,"Java exception in fetchNextRow()" ,"Java exception in close()" + ,"Java exception in getRowNum()" }; + ////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////// @@ -70,11 +74,29 @@ OrcFileReader::~OrcFileReader() OFR_RetCode OrcFileReader::init() { static char className[]="org/trafodion/sql/OrcFileReader"; - - if (JavaMethods_) - return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, TRUE); - else + + OFR_RetCode lv_retcode = OFR_OK; + + QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, + LL_DEBUG, + "Enter OrcFileReader::init()"); + + if (isInitialized()) + return lv_retcode; + + if (javaMethodsInitialized_) + return (OFR_RetCode)JavaObjectInterface::init(className, + javaClass_, + JavaMethods_, + (Int32)JM_LAST, javaMethodsInitialized_); + else { + pthread_mutex_lock(&javaMethodsInitMutex_); + if (javaMethodsInitialized_) + { + pthread_mutex_unlock(&javaMethodsInitMutex_); + return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); + } JavaMethods_ = new JavaMethodInit[JM_LAST]; JavaMethods_[JM_CTOR ].jm_name = "<init>"; @@ -111,9 +133,15 @@ OFR_RetCode OrcFileReader::init() // JavaMethods_[JM_FETCHBUFF2].jm_signature = "(II)[Ljava/lang/String;"; JavaMethods_[JM_CLOSE ].jm_name = "close"; JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;"; - - return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, FALSE); + + lv_retcode = (OFR_RetCode)JavaObjectInterface::init(className, + javaClass_, + JavaMethods_, + (Int32)JM_LAST, javaMethodsInitialized_); + javaMethodsInitialized_ = TRUE; + pthread_mutex_unlock(&javaMethodsInitMutex_); } + return lv_retcode; } ////////////////////////////////////////////////////////////////////////////// @@ -122,24 +150,28 @@ OFR_RetCode OrcFileReader::init() OFR_RetCode OrcFileReader::open(const char* path) { QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::open(%s) called.", path); + + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_OPEN_PARAM; jstring js_path = jenv_->NewStringUTF(path); if (js_path == NULL) + { + jenv_->PopLocalFrame(NULL); return OFR_ERROR_OPEN_PARAM; - + } // String open(java.lang.String); tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path); - jenv_->DeleteLocalRef(js_path); - - if (jresult != NULL) + if (jenv_->ExceptionCheck()) { - const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE); - printf("open error: %s\n", my_string); + getExceptionDetails(); logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::open()", jresult); + jenv_->PopLocalFrame(NULL); return OFR_ERROR_OPEN_EXCEPTION; } + jenv_->PopLocalFrame(NULL); return OFR_OK; } @@ -149,18 +181,32 @@ OFR_RetCode OrcFileReader::open(const char* path) OFR_RetCode OrcFileReader::getPosition(Int64& pos) { QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getPosition(%ld) called.", pos); + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_GETPOS_EXCEPTION; // long getPosition(); tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name; Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID); - if (result == -1) + if (jenv_->ExceptionCheck()) { + getExceptionDetails(); + logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__); logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::getPosition()", getLastError()); + jenv_->PopLocalFrame(NULL); + return OFR_ERROR_GETPOS_EXCEPTION; + } + + if (result == -1) { + logError(CAT_SQL_HDFS_ORC_FILE_READER, + "OrcFileReader::getPosition()", + getLastError()); + jenv_->PopLocalFrame(NULL); return OFR_ERROR_GETPOS_EXCEPTION; } pos = result; + jenv_->PopLocalFrame(NULL); return OFR_OK; } @@ -169,23 +215,41 @@ OFR_RetCode OrcFileReader::getPosition(Int64& pos) ////////////////////////////////////////////////////////////////////////////// OFR_RetCode OrcFileReader::seeknSync(Int64 pos) { - Int64 orcPos; + Int64 orcPos; + + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_SYNC_EXCEPTION; - orcPos = pos -1; //When you position in ORC, reading the NEXT row will be one greater than what you wanted. + orcPos = pos -1; //When you position in ORC, reading the NEXT row will be one greater than what you wanted. QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::seeknSync(%ld) called.", pos); // String seeknSync(long); tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, orcPos); - if (jresult != NULL) + if (jenv_->ExceptionCheck()) { - const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE); - printf("seeknSync error: %s\n", my_string); - logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::seeknSync()", jresult); + getExceptionDetails(); + logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__); + jenv_->PopLocalFrame(NULL); return OFR_ERROR_SYNC_EXCEPTION; } - + + if (jresult != NULL) { + const char *my_string = jenv_->GetStringUTFChars(jresult, + JNI_FALSE); + QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, + LL_DEBUG, + "OrcFileReader::seeknSync(%ld) error: %s\n", + pos, + my_string); + logError(CAT_SQL_HDFS_ORC_FILE_READER, + "OrcFileReader::seeknSync()", + jresult); + jenv_->PopLocalFrame(NULL); + return OFR_ERROR_SYNC_EXCEPTION; + } + jenv_->PopLocalFrame(NULL); return OFR_OK; } @@ -196,11 +260,22 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF) { QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::isEOF() called."); + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_ISEOF_EXCEPTION; // boolean isEOF(); tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name; - bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID); + bool result = jenv_->CallBooleanMethod(javaObj_, + JavaMethods_[JM_ISEOF].methodID); + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(); + logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__); + jenv_->PopLocalFrame(NULL); + return OFR_ERROR_ISEOF_EXCEPTION; + } isEOF = result; + jenv_->PopLocalFrame(NULL); return OFR_OK; } @@ -211,11 +286,14 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF) OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& rowNumber, int& num_columns) { /* + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_FETCHROW_EXCEPTION; // java.lang.String fetchNextRow(long stopOffset); jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset); if (jresult==NULL && getLastError()) { logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError()); + jenv_->PopLocalFrame(NULL); return OFR_ERROR_FETCHROW_EXCEPTION; } @@ -227,7 +305,7 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& const char* char_result = jenv_->GetStringUTFChars(jresult, 0); strcpy(buffer, char_result); jenv_->ReleaseStringUTFChars(jresult, char_result); - jenv_->DeleteLocalRef(jresult); + jenv_->PopLocalFrame(NULL); return OFR_OK; */ @@ -236,58 +314,71 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name; jobject jresult = (jobject)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID); + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(); + logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__); + logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError()); + jenv_->PopLocalFrame(NULL); + return OFR_ERROR_FETCHROW_EXCEPTION; + } if (jresult==NULL && getLastError()) { - logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError()); - return OFR_ERROR_FETCHROW_EXCEPTION; + logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError()); + return OFR_ERROR_FETCHROW_EXCEPTION; } if (jresult == NULL) - return (OFR_NOMORE); //No more rows + return (OFR_NOMORE); //No more rows //Retrieve row and associated data jclass cls = jenv_->GetObjectClass(jresult); fid = jenv_->GetFieldID(cls,"m_row_length","I"); if (fid ==NULL) - { - return (OFR_ERROR_FETCHROW_EXCEPTION); - } + { + jenv_->PopLocalFrame(NULL); + return (OFR_ERROR_FETCHROW_EXCEPTION); + } jint row_length = (jint)jenv_->GetIntField(jresult, fid); array_length = (long)row_length; - fid = jenv_->GetFieldID(cls,"m_column_count","I"); if (fid ==NULL) - { - return(OFR_ERROR_FETCHROW_EXCEPTION); - } + { + jenv_->PopLocalFrame(NULL); + return(OFR_ERROR_FETCHROW_EXCEPTION); + } jint column_count = (jint)jenv_->GetIntField(jresult, fid); num_columns = column_count; fid = jenv_->GetFieldID(cls,"m_row_number","J"); if (fid ==NULL) - { - return(OFR_ERROR_FETCHROW_EXCEPTION); - } + { + jenv_->PopLocalFrame(NULL); + return(OFR_ERROR_FETCHROW_EXCEPTION); + } jlong rowNum = (jlong)jenv_->GetIntField(jresult, fid); rowNumber = rowNum; - - // Get the actual row (it is a byte array). Use the row_length above to specify how much to copy fid = jenv_->GetFieldID(cls,"m_row_ba","[B"); if (fid ==NULL) - { - return (OFR_ERROR_FETCHROW_EXCEPTION); - } + { + jenv_->PopLocalFrame(NULL); + return (OFR_ERROR_FETCHROW_EXCEPTION); + } jbyteArray jrow = (jbyteArray)jenv_->GetObjectField(jresult, fid); - if (jrow == NULL) - return (OFR_ERROR_FETCHROW_EXCEPTION); + if (jrow == NULL) + { + jenv_->PopLocalFrame(NULL); + return (OFR_ERROR_FETCHROW_EXCEPTION); + } - jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer); - jenv_->DeleteLocalRef(jrow); + jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer); + jenv_->DeleteLocalRef(jrow); + jenv_->PopLocalFrame(NULL); return (OFR_OK); } @@ -297,22 +388,31 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& OFR_RetCode OrcFileReader::close() { QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::close() called."); - if (javaObj_ == NULL) - { - // Maybe there was an initialization error. - return OFR_OK; - } + + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_GETPOS_EXCEPTION; // String close(); tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID); - if (jresult!=NULL) + if (jenv_->ExceptionCheck()) { - logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::close()", jresult); + getExceptionDetails(); + logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__); + jenv_->PopLocalFrame(NULL); + return OFR_ERROR_CLOSE_EXCEPTION; + } + + if (jresult!=NULL) { + logError(CAT_SQL_HDFS_ORC_FILE_READER, + "OrcFileReader::close()", + jresult); + jenv_->PopLocalFrame(NULL); return OFR_ERROR_CLOSE_EXCEPTION; } + jenv_->PopLocalFrame(NULL); return OFR_OK; } @@ -322,16 +422,22 @@ OFR_RetCode OrcFileReader::close() OFR_RetCode OrcFileReader::getRowCount(Int64& count) { QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getRowCount() called."); - if (javaObj_ == NULL) - { - // Maybe there was an initialization error. - return OFR_OK; - } - + if (initJNIEnv() != JOI_OK) + return OFR_ERROR_GETNUMROWS_EXCEPTION; + tsRecentJMFromJNI = JavaMethods_[JM_GETNUMROWS].jm_full_name; jlong jresult = (jlong)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_GETNUMROWS].methodID); count = jresult; + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(); + logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__); + jenv_->PopLocalFrame(NULL); + return OFR_ERROR_GETNUMROWS_EXCEPTION; + } + + jenv_->PopLocalFrame(NULL); return OFR_OK; } @@ -388,5 +494,5 @@ Removed until implemented QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, " =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead); return retCode; */ - return (OFR_NOMORE); + return OFR_OK; } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/OrcFileReader.h b/core/sql/executor/OrcFileReader.h index c9c491b..925456b 100644 --- a/core/sql/executor/OrcFileReader.h +++ b/core/sql/executor/OrcFileReader.h @@ -42,6 +42,7 @@ typedef enum { ,OFR_ERROR_ISEOF_EXCEPTION // Java exception in isEOF() ,OFR_ERROR_FETCHROW_EXCEPTION // Java exception in fetchNextRow() ,OFR_ERROR_CLOSE_EXCEPTION // Java exception in close() + ,OFR_ERROR_GETNUMROWS_EXCEPTION ,OFR_LAST } OFR_RetCode; @@ -115,6 +116,9 @@ private: static jclass javaClass_; static JavaMethodInit* JavaMethods_; + static bool javaMethodsInitialized_; + // this mutex protects both JaveMethods_ and javaClass_ initialization + static pthread_mutex_t javaMethodsInitMutex_; }; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/SequenceFileReader.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp index 0ef0399..a67db87 100644 --- a/core/sql/executor/SequenceFileReader.cpp +++ b/core/sql/executor/SequenceFileReader.cpp @@ -176,9 +176,12 @@ SFR_RetCode SequenceFileReader::init() SFR_RetCode SequenceFileReader::open(const char* path) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::open(%s) called.", path); + if (initJNIEnv() != JOI_OK) + return SFR_ERROR_OPEN_PARAM; jstring js_path = jenv_->NewStringUTF(path); if (js_path == NULL) - return SFR_ERROR_OPEN_PARAM; + jenv_->PopLocalFrame(NULL); + return SFR_ERROR_OPEN_PARAM; // String open(java.lang.String); tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name; @@ -189,9 +192,10 @@ SFR_RetCode SequenceFileReader::open(const char* path) if (jresult != NULL) { logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", jresult); + jenv_->PopLocalFrame(NULL); return SFR_ERROR_OPEN_EXCEPTION; } - + jenv_->PopLocalFrame(NULL); return SFR_OK; } @@ -202,6 +206,9 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::getPosition(%ld) called.", pos); + if (initJNIEnv() != JOI_OK) + return SFR_ERROR_GETPOS_EXCEPTION; + // long getPosition(); tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name; Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID); @@ -209,10 +216,12 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos) if (result == -1) { logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::getPosition()", getLastError()); + jenv_->PopLocalFrame(NULL); return SFR_ERROR_GETPOS_EXCEPTION; } pos = result; + jenv_->PopLocalFrame(NULL); return SFR_OK; } @@ -223,6 +232,9 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::seeknSync(%ld) called.", pos); + if (initJNIEnv() != JOI_OK) + return SFR_ERROR_GETPOS_EXCEPTION; + // String seeknSync(long); tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, pos); @@ -230,9 +242,11 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos) if (jresult != NULL) { logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::seeknSync()", jresult); + jenv_->PopLocalFrame(NULL); return SFR_ERROR_SYNC_EXCEPTION; } + jenv_->PopLocalFrame(NULL); return SFR_OK; } @@ -243,10 +257,13 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::isEOF() called."); + if (initJNIEnv() != JOI_OK) + return SFR_ERROR_ISEOF_EXCEPTION; // boolean isEOF(); tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name; bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID); + jenv_->PopLocalFrame(NULL); isEOF = result; return SFR_OK; } @@ -279,24 +296,29 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF) ////////////////////////////////////////////////////////////////////////////// SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer) { + if (initJNIEnv() != JOI_OK) + return SFR_ERROR_FETCHROW_EXCEPTION; + // java.lang.String fetchNextRow(long stopOffset); tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset); if (jresult==NULL && getLastError()) { logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::fetchNextRow()", getLastError()); + jenv_->PopLocalFrame(NULL); return SFR_ERROR_FETCHROW_EXCEPTION; } if (jresult == NULL) { + jenv_->PopLocalFrame(NULL); return SFR_NOMORE; } const char* char_result = jenv_->GetStringUTFChars(jresult, 0); strcpy(buffer, char_result); jenv_->ReleaseStringUTFChars(jresult, char_result); - jenv_->DeleteLocalRef(jresult); + jenv_->PopLocalFrame(NULL); return SFR_OK; } @@ -306,11 +328,9 @@ SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer) SFR_RetCode SequenceFileReader::close() { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::close() called."); - if (javaObj_ == NULL) - { - // Maybe there was an initialization error. - return SFR_OK; - } + + if (initJNIEnv() != JOI_OK) + return SFR_ERROR_CLOSE_EXCEPTION; // String close(); tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name; @@ -319,9 +339,11 @@ SFR_RetCode SequenceFileReader::close() if (jresult!=NULL) { logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::close()", jresult); + jenv_->PopLocalFrame(NULL); return SFR_ERROR_CLOSE_EXCEPTION; } + jenv_->PopLocalFrame(NULL); return SFR_OK; } @@ -535,22 +557,26 @@ SFW_RetCode SequenceFileWriter::init() SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::open(%s) called.", path); + if (initJNIEnv() != JOI_OK) + return SFW_ERROR_OPEN_PARAM; jstring js_path = jenv_->NewStringUTF(path); - if (js_path == NULL) - return SFW_ERROR_OPEN_PARAM; + if (js_path == NULL) { + jenv_->PopLocalFrame(NULL); + return SFW_ERROR_OPEN_PARAM; + } // String open(java.lang.String); tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path, compression); - jenv_->DeleteLocalRef(js_path); - if (jresult != NULL) { logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::open()", jresult); + jenv_->PopLocalFrame(NULL); return SFW_ERROR_OPEN_EXCEPTION; } + jenv_->PopLocalFrame(NULL); return SFW_OK; } @@ -560,9 +586,14 @@ SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression) SFW_RetCode SequenceFileWriter::write(const char* data) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::write(%s) called.", data); + if (initJNIEnv() != JOI_OK) + return SFW_ERROR_WRITE_PARAM; + jstring js_data = jenv_->NewStringUTF(data); - if (js_data == NULL) + if (js_data == NULL) { + jenv_->PopLocalFrame(NULL); return SFW_ERROR_WRITE_PARAM; + } // String write(java.lang.String); tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name; @@ -573,9 +604,11 @@ SFW_RetCode SequenceFileWriter::write(const char* data) if (jresult != NULL) { logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", jresult); + jenv_->PopLocalFrame(NULL); return SFW_ERROR_WRITE_EXCEPTION; } + jenv_->PopLocalFrame(NULL); return SFW_OK; } @@ -616,12 +649,10 @@ SFW_RetCode SequenceFileWriter::writeBuffer(char* data, Int64 buffSize, const ch SFW_RetCode SequenceFileWriter::close() { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called."); - if (javaObj_ == NULL) - { - // Maybe there was an initialization error. - return SFW_OK; - } + if (initJNIEnv() != JOI_OK) + return SFW_ERROR_CLOSE_EXCEPTION; + // String close(); tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name; jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID); @@ -629,10 +660,11 @@ SFW_RetCode SequenceFileWriter::close() if (jresult != NULL) { logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::close()", jresult); + jenv_->PopLocalFrame(NULL); return SFW_ERROR_CLOSE_EXCEPTION; } - + jenv_->PopLocalFrame(NULL); return SFW_OK; } @@ -644,10 +676,8 @@ SFW_RetCode SequenceFileWriter::hdfsCreate(const char* path, NABoolean compress) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsCreate(%s) called.", path); - if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); - return SFW_ERROR_HDFS_WRITE_EXCEPTION; - } + if (initJNIEnv() != JOI_OK) + return SFW_ERROR_HDFS_CREATE_PARAM; jstring js_path = jenv_->NewStringUTF(path); if (js_path == NULL) { @@ -688,10 +718,8 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len) { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsWrite(%ld) called.", len); - if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); + if (initJNIEnv() != JOI_OK) return SFW_ERROR_HDFS_WRITE_EXCEPTION; - } //Write the requisite bytes into the file jbyteArray jbArray = jenv_->NewByteArray( len); @@ -733,11 +761,9 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len) SFW_RetCode SequenceFileWriter::hdfsClose() { QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called."); - if (javaObj_ == NULL) - { - // Maybe there was an initialization error. - return SFW_OK; - } + + if (initJNIEnv() != JOI_OK) + return SFW_ERROR_HDFS_CLOSE_EXCEPTION; // String close(); tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name; @@ -748,27 +774,27 @@ SFW_RetCode SequenceFileWriter::hdfsClose() getExceptionDetails(); logError(CAT_SQL_HBASE, __FILE__, __LINE__); logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError()); + jenv_->PopLocalFrame(NULL); return SFW_ERROR_HDFS_CLOSE_EXCEPTION; } if (jresult == false) { logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError()); + jenv_->PopLocalFrame(NULL); return SFW_ERROR_HDFS_CLOSE_EXCEPTION; } - + jenv_->PopLocalFrame(NULL); return SFW_OK; } SFW_RetCode SequenceFileWriter::hdfsCleanUnloadPath( const NAString& uldPath) { QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsCleanUnloadPath(%s) called.", - uldPath.data()); - if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); - return SFW_ERROR_HDFS_CLEANUP_EXCEPTION; - } + uldPath.data()); + if (initJNIEnv() != JOI_OK) + return SFW_ERROR_HDFS_CLEANUP_PARAM; jstring js_UldPath = jenv_->NewStringUTF(uldPath.data()); if (js_UldPath == NULL) { @@ -799,10 +825,9 @@ SFW_RetCode SequenceFileWriter::hdfsMergeFiles( const NAString& srcPath, QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsMergeFiles(%s, %s) called.", srcPath.data(), dstPath.data()); - if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); + if (initJNIEnv() != JOI_OK) return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION; - } + jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data()); if (js_SrcPath == NULL) { @@ -845,10 +870,8 @@ SFW_RetCode SequenceFileWriter::hdfsDeletePath( const NAString& delPath) { QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsDeletePath(%s called.", delPath.data()); - if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); + if (initJNIEnv() != JOI_OK) return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION; - } jstring js_delPath = jenv_->NewStringUTF(delPath.data()); if (js_delPath == NULL) { @@ -887,10 +910,8 @@ SFW_RetCode SequenceFileWriter::hdfsExists( const NAString& uldPath, NABoolean & QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsExists(%s) called.", uldPath.data()); - if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); + if (initJNIEnv() != JOI_OK) return SFW_ERROR_HDFS_EXISTS_EXCEPTION; - } jstring js_UldPath = jenv_->NewStringUTF(uldPath.data()); if (js_UldPath == NULL) {
