[TRAFODION-2982] JNI HDFS interface should support varied sized large buffers for read/write
Changed the interim byte array size to be of int type to accoumdate at least 64MB as per review comments. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/d9ee71e1 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/d9ee71e1 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/d9ee71e1 Branch: refs/heads/master Commit: d9ee71e10bf9054be28ca000aa64553abc00584f Parents: 96cab4d Author: selvaganesang <[email protected]> Authored: Wed May 9 23:35:46 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Wed May 9 23:35:46 2018 +0000 ---------------------------------------------------------------------- core/sql/comexe/ComCompressionInfo.cpp | 84 ++++++++++++++++++ core/sql/comexe/ComCompressionInfo.h | 91 ++++++++++++++++++++ core/sql/comexe/ComTdbFastTransport.cpp | 2 +- core/sql/comexe/ComTdbFastTransport.h | 8 +- core/sql/comexe/ComTdbHdfsScan.cpp | 2 +- core/sql/comexe/ComTdbHdfsScan.h | 9 +- core/sql/executor/ExHdfsScan.cpp | 2 +- core/sql/executor/HdfsClient_JNI.cpp | 21 ++--- core/sql/executor/HdfsClient_JNI.h | 12 +-- core/sql/nskgmake/comexe/Makefile | 3 +- .../main/java/org/trafodion/sql/HDFSClient.java | 10 +-- .../main/java/org/trafodion/sql/HdfsScan.java | 9 +- 12 files changed, 214 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComCompressionInfo.cpp ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComCompressionInfo.cpp b/core/sql/comexe/ComCompressionInfo.cpp new file mode 100644 index 0000000..75ae6ce --- /dev/null +++ b/core/sql/comexe/ComCompressionInfo.cpp @@ -0,0 +1,84 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + +#include "ComCompressionInfo.h" + +ComCompressionInfo::~ComCompressionInfo() +{} + +void ComCompressionInfo::setCompressionMethod(const char *fileName) +{ + compressionMethod_ = getCompressionMethodFromFileName(fileName); +} + +ComCompressionInfo::CompressionMethod ComCompressionInfo::getCompressionMethodFromFileName( + const char *f) +{ + const char * ret = strcasestr(f, ".lzo_deflate"); + if (ret) + return LZO_DEFLATE; + ret = strcasestr(f, ".lzo"); + if (ret) + return LZOP; + ret = strcasestr(f, ".deflate"); + if (ret) + return DEFLATE; + ret = strcasestr(f, ".gz"); + if (ret) + return GZIP; + + return UNCOMPRESSED; +} + +// virtual methods overridden from NAVersionedObject base class + +char *ComCompressionInfo::findVTblPtr(short classID) +{ + char *vtblPtr; + GetVTblPtr(vtblPtr, ComCompressionInfo); + + return vtblPtr; +} + +unsigned char ComCompressionInfo::getClassVersionID() +{ + return 1; +} + +void ComCompressionInfo::populateImageVersionIDArray() +{ + setImageVersionID(0,getClassVersionID()); +} + +short ComCompressionInfo::getClassSize() +{ + return (short) sizeof(ComCompressionInfo); +} + +Long ComCompressionInfo::pack(void * space) +{ + return NAVersionedObject::pack(space); +} + +Lng32 ComCompressionInfo::unpack(void * base, void * reallocator) +{ + return NAVersionedObject::unpack(base, reallocator); +} http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComCompressionInfo.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComCompressionInfo.h b/core/sql/comexe/ComCompressionInfo.h new file mode 100644 index 0000000..3927bad --- /dev/null +++ b/core/sql/comexe/ComCompressionInfo.h @@ -0,0 +1,91 @@ +/********************************************************************** +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ +/* -*-C++-*- +**************************************************************************** +* +* File: ComCompressionInfo.h +* Description: Description of the compression method used, for how +* this is used for Hive tables, but it could be +* expanded to other objects. +* Created: 4/20/16 +* Language: C++ +* +**************************************************************************** +*/ + +#ifndef COM_COMPRESSION_INFO_H +#define COM_COMPRESSION_INFO_H + +#include "NAVersionedObject.h" + +class ComCompressionInfo : public NAVersionedObject +{ +public: + // Update the COMPRESSION_TYPE[] at org/trafodion/sql/HDFSClient.java when new enum is added + enum CompressionMethod + { UNKNOWN_COMPRESSION = 0, // unable to determine compression method + UNCOMPRESSED, // file is not compressed + LZO_DEFLATE, // using LZO deflate compression + DEFLATE, // using DEFLATE compression + GZIP, // using GZIP compression + LZOP, // using LZOP compression + SUPPORTED_COMPRESSIONS }; // Add any compression type above this line + + ComCompressionInfo(CompressionMethod cm = UNKNOWN_COMPRESSION) : + NAVersionedObject(-1), + compressionMethod_(cm) + {} + + virtual ~ComCompressionInfo(); + + bool operator==(const ComCompressionInfo &o) const + { return compressionMethod_ == o.compressionMethod_; } + + CompressionMethod getCompressionMethod() const { return compressionMethod_; } + void setCompressionMethod(const char *fileName); + + NABoolean isCompressed() const + { return (compressionMethod_ != UNCOMPRESSED && + compressionMethod_ != UNKNOWN_COMPRESSION ); } + + NABoolean splitsAllowed() const { return !isCompressed(); } + + // try to determine the compression method just from a file name + static CompressionMethod getCompressionMethodFromFileName(const char *f); + + // --------------------------------------------------------------------- + // Redefine virtual functions required for Versioning. + //---------------------------------------------------------------------- + virtual char *findVTblPtr(short classID); + virtual unsigned char getClassVersionID(); + virtual void populateImageVersionIDArray(); + virtual short getClassSize(); + virtual Long pack(void * space); + virtual Lng32 unpack(void * base, void * reallocator); + +private: + + CompressionMethod compressionMethod_; + +}; + +#endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbFastTransport.cpp ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbFastTransport.cpp b/core/sql/comexe/ComTdbFastTransport.cpp index 90f635f..0b9cf9e 100644 --- a/core/sql/comexe/ComTdbFastTransport.cpp +++ b/core/sql/comexe/ComTdbFastTransport.cpp @@ -99,7 +99,7 @@ ComTdbFastExtract::ComTdbFastExtract( hdfsReplication_(replication), ioTimeout_(ioTimeout), childDataRowLen_(childDataRowLen), - hdfsIoByteArraySize_(0), + hdfsIoByteArraySizeInKB_(0), modTSforDir_(-1) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbFastTransport.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbFastTransport.h b/core/sql/comexe/ComTdbFastTransport.h index 0666953..2fb89cc 100644 --- a/core/sql/comexe/ComTdbFastTransport.h +++ b/core/sql/comexe/ComTdbFastTransport.h @@ -370,9 +370,9 @@ public: Int64 getModTSforDir() const { return modTSforDir_; } void setHdfsIoByteArraySize(int size) - { hdfsIoByteArraySize_ = size; } - UInt16 getHdfsIoByteArraySize() - { return hdfsIoByteArraySize_; } + { hdfsIoByteArraySizeInKB_ = size; } + Lng32 getHdfsIoByteArraySize() + { return hdfsIoByteArraySizeInKB_; } protected: NABasicPtr targetName_; // 00 - 07 NABasicPtr delimiter_; // 08 - 15 @@ -399,7 +399,7 @@ protected: UInt16 filler_; // 130 - 131 UInt32 childDataRowLen_; // 132 - 135 Int64 modTSforDir_; // 136 - 143 - UInt16 hdfsIoByteArraySize_; // 144 - 147 + Lng32 hdfsIoByteArraySizeInKB_; // 144 - 147 // Make sure class size is a multiple of 8 char fillerComTdbFastTransport_[4]; // 148 - 151 http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.cpp b/core/sql/comexe/ComTdbHdfsScan.cpp index f5e2907..fb58b54 100755 --- a/core/sql/comexe/ComTdbHdfsScan.cpp +++ b/core/sql/comexe/ComTdbHdfsScan.cpp @@ -121,7 +121,7 @@ ComTdbHdfsScan::ComTdbHdfsScan( hdfsRootDir_(hdfsRootDir), modTSforDir_(modTSforDir), numOfPartCols_(numOfPartCols), - hdfsIoByteArraySize_(0), + hdfsIoByteArraySizeInKB_(0), hdfsDirsToCheck_(hdfsDirsToCheck) {}; http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h index f9a0afd..80a0280 100755 --- a/core/sql/comexe/ComTdbHdfsScan.h +++ b/core/sql/comexe/ComTdbHdfsScan.h @@ -136,14 +136,13 @@ class ComTdbHdfsScan : public ComTdb UInt16 origTuppIndex_; // 188 - 189 char fillersComTdbHdfsScan1_[2]; // 190 - 191 NABasicPtr nullFormat_; // 192 - 199 - UInt16 hdfsIoByteArraySize_; // 198 - 199 // next 4 params are used to check if data under hdfsFileDir // was modified after query was compiled. NABasicPtr hdfsRootDir_; // 200 - 207 Int64 modTSforDir_; // 208 - 215 Lng32 numOfPartCols_; // 216 - 219 - char fillersComTdbHdfsScan2_[4]; // 220 - 223 + Lng32 hdfsIoByteArraySizeInKB_; // 220 - 223 QueuePtr hdfsDirsToCheck_; // 224 - 231 public: @@ -364,9 +363,9 @@ public: char *hdfsRootDir() { return hdfsRootDir_; } void setHdfsIoByteArraySize(int size) - { hdfsIoByteArraySize_ = size; } - UInt16 getHdfsIoByteArraySize() - { return hdfsIoByteArraySize_; } + { hdfsIoByteArraySizeInKB_ = size; } + int getHdfsIoByteArraySize() + { return hdfsIoByteArraySizeInKB_; } }; inline ComTdb * ComTdbHdfsScan::getChildTdb() http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index e5d73dc..9c68aa7 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -569,7 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work() break; } hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, - hdfsScanTdb().hdfsIoByteArraySize_, + hdfsScanTdb().hdfsIoByteArraySizeInKB_, &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, hdfsStats_, hdfsScanRetCode); if (hdfsScanRetCode != HDFS_SCAN_OK) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index 5b8e850..72157bf 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -27,6 +27,7 @@ #include "jni.h" #include "HdfsClient_JNI.h" #include "org_trafodion_sql_HDFSClient.h" +#include "ComCompressionInfo.h" // =========================================================================== // ===== Class HdfsScan @@ -84,7 +85,7 @@ HDFS_Scan_RetCode HdfsScan::init() JavaMethods_[JM_CTOR ].jm_name = "<init>"; JavaMethods_[JM_CTOR ].jm_signature = "()V"; JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges"; - JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;S[Ljava/lang/String;[J[J[I[S)V"; + JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[S)V"; JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead"; JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I"; JavaMethods_[JM_STOP].jm_name = "stop"; @@ -107,7 +108,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum) } ///////////////////////////////////////////////////////////////////////////// -HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize, +HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called."); @@ -139,7 +140,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan jenv_->PopLocalFrame(NULL); return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; } - jshort j_hdfsIoByteArraySize = hdfsIoByteArraySize; + jint j_hdfsIoByteArraySizeInKB = hdfsIoByteArraySizeInKB; jobjectArray j_filenames = NULL; jlongArray j_offsets = NULL; jlongArray j_lens = NULL; @@ -212,14 +213,14 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan } } short compressionMethod = (short)hdfo->getCompressionMethod(); - //ex_assert(compressionMethod >= 0 && compressionMethod <= ComCompressionInfo::LZOP, "Illegal CompressionMethod Value"); + ex_assert(compressionMethod >= 0 && compressionMethod < ComCompressionInfo::SUPPORTED_COMPRESSIONS, "Illegal CompressionMethod Value"); jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod); } if (hdfsStats_ != NULL) hdfsStats_->getHdfsTimer().start(); tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name; - jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_hdfsIoByteArraySize, + jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_hdfsIoByteArraySizeInKB, j_filenames, j_offsets, j_lens, j_rangenums, j_compress); if (hdfsStats_ != NULL) { hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); @@ -235,7 +236,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan } HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, - short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, + int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called."); @@ -247,7 +248,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs if (hdfsScan != NULL) { hdfsScanRetCode = hdfsScan->init(); if (hdfsScanRetCode == HDFS_SCAN_OK) - hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySize, + hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySizeInKB, hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); if (hdfsScanRetCode == HDFS_SCAN_OK) hdfsScan->setHdfsStats(hdfsStats); @@ -378,7 +379,7 @@ void HdfsClient::deleteHdfsFileInfo() hdfsFileInfo_ = NULL; } -HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, short hdfsIoByteArraySize) +HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, int hdfsIoByteArraySizeInKB) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called."); @@ -390,7 +391,7 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HD retCode = hdfsClient->init(); if (retCode == HDFS_CLIENT_OK) { hdfsClient->setHdfsStats(hdfsStats); - hdfsClient->setIoByteArraySize(hdfsIoByteArraySize); + hdfsClient->setIoByteArraySize(hdfsIoByteArraySizeInKB); } else { NADELETE(hdfsClient, HdfsClient, heap); @@ -597,7 +598,7 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hd } Int64 lenRemain = len; Int64 writeLen; - Int64 chunkLen = (ioByteArraySize_ > 0 ? ioByteArraySize_ * 1024 : 0); + Int64 chunkLen = (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0); Int64 offset = 0; do { http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index a85c590..888451c 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -66,11 +66,11 @@ public: // Get the error description. static char* getErrorText(HDFS_Scan_RetCode errEnum); - static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize, + static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode); - HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize, + HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize); @@ -169,11 +169,11 @@ public: } ~HdfsClient(); - static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, short hdfsIoByteArraySize = 0); + static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, int hdfsIoByteArraySizeInKB = 0); static HdfsClient *getInstance(); static void deleteInstance(); - void setIoByteArraySize(short size) - { ioByteArraySize_ = size; } + void setIoByteArraySize(int size) + { ioByteArraySizeInKB_ = size; } // Get the error description. static char* getErrorText(HDFS_Client_RetCode errEnum); @@ -226,7 +226,7 @@ private: int numFiles_; char *path_; Int64 totalBytesWritten_; - short ioByteArraySize_; + Int32 ioByteArraySizeInKB_; ExHdfsScanStats *hdfsStats_; static jclass javaClass_; static JavaMethodInit* JavaMethods_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/nskgmake/comexe/Makefile ---------------------------------------------------------------------- diff --git a/core/sql/nskgmake/comexe/Makefile b/core/sql/nskgmake/comexe/Makefile index 6cc6c4e..4ae6e75 100755 --- a/core/sql/nskgmake/comexe/Makefile +++ b/core/sql/nskgmake/comexe/Makefile @@ -72,7 +72,8 @@ CPPSRC := CmpMessage.cpp \ udrtabledescinfo.cpp \ ComTrace.cpp \ ComTdbCancel.cpp \ - ComTdbHdfsScan.cpp + ComTdbHdfsScan.cpp \ + ComCompressionInfo.cpp CPPSRC += vers_libcomexe.cpp http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 5ffcd03..2f24dce 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -110,7 +110,7 @@ public class HDFSClient boolean compressed_ = false; private CompressionCodec codec_ = null; private short compressionType_; - private short ioByteArraySize_; + private int ioByteArraySizeInKB_; static { String confFile = System.getProperty("trafodion.log4j.configFile"); System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); @@ -148,7 +148,7 @@ public class HDFSClient int bytesRead; int totalBytesRead = 0; if (compressed_) { - bufArray_ = new byte[ioByteArraySize_ * 1024]; + bufArray_ = new byte[ioByteArraySizeInKB_ * 1024]; } else if (! buf_.hasArray()) { try { @@ -223,13 +223,13 @@ public class HDFSClient // The passed in length can never be more than the size of the buffer // If the range has a length more than the buffer length, the range is chunked // in HdfsScan - public HDFSClient(int bufNo, short ioByteArraySize, int rangeNo, String filename, ByteBuffer buffer, long position, + public HDFSClient(int bufNo, int ioByteArraySizeInKB, int rangeNo, String filename, ByteBuffer buffer, long position, int length, short compressionType, CompressionInputStream inStream) throws IOException { bufNo_ = bufNo; rangeNo_ = rangeNo; filename_ = filename; - ioByteArraySize_ = ioByteArraySize; + ioByteArraySizeInKB_ = ioByteArraySizeInKB; filepath_ = new Path(filename_); fs_ = FileSystem.get(filepath_.toUri(),config_); compressionType_ = compressionType; @@ -384,7 +384,7 @@ public class HDFSClient int bufLen; int bufOffset = 0; if (compressed_ && bufArray_ != null) - bufArray_ = new byte[ioByteArraySize_ * 1024]; + bufArray_ = new byte[ioByteArraySizeInKB_ * 1024]; if (buffer.hasArray()) bufLen = buffer.array().length; else http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index b438fb2..48d5768 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -74,12 +74,11 @@ public class HdfsScan private int lastBufCompleted_ = -1; private boolean scanCompleted_; private CompressionInputStream currInStream_; - private short ioByteArraySize_; + private int ioByteArraySizeInKB_; // Structure to hold the Scan ranges for this HdfsScan instance // - class HdfsScanRange { String filename_; @@ -109,7 +108,7 @@ public class HdfsScan { } - public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, short ioByteArraySize, String filename[], long pos[], + public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, int ioByteArraySizeInKB, String filename[], long pos[], long len[], int rangeNum[], short compressionType[]) throws IOException { // Two buffers to hold the data read @@ -118,7 +117,7 @@ public class HdfsScan buf_[0] = buf1; buf_[1] = buf2; - ioByteArraySize_ = ioByteArraySize; + ioByteArraySizeInKB_ = ioByteArraySizeInKB; for (int i = 0; i < 2 ; i++) { if (buf_[i].hasArray()) bufLen_[i] = buf_[i].array().length; @@ -166,7 +165,7 @@ public class HdfsScan if (! scanCompleted_) { if (logger_.isDebugEnabled()) logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + currRangeLenRemain_ + " BufNo " + bufNo); - hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySize_, hdfsScanRanges_[currRange_].tdbRangeNum_, + hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySizeInKB_, hdfsScanRanges_[currRange_].tdbRangeNum_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength, hdfsScanRanges_[currRange_].compressionType_, currInStream_);
