http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/nskgmake/tdm_sqlexp/Makefile ---------------------------------------------------------------------- diff --git a/core/sql/nskgmake/tdm_sqlexp/Makefile b/core/sql/nskgmake/tdm_sqlexp/Makefile index 6b973af..7213449 100755 --- a/core/sql/nskgmake/tdm_sqlexp/Makefile +++ b/core/sql/nskgmake/tdm_sqlexp/Makefile @@ -77,7 +77,7 @@ CPPSRC += exp_ieee.cpp CPPSRC += vers_libtdm_sqlexp.cpp -DEFS := -D_IEEE_FLOAT -DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H -D__STDC_CONSTANT_MACROS -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS +DEFS := -D_IEEE_FLOAT -DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H -D__STDC_CONSTANT_MACROS # LLVM_OBJ_DIR := $(LLVM)/$(SQ_BUILD_TYPE)/lib
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/HDFSHook.cpp ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/HDFSHook.cpp b/core/sql/optimizer/HDFSHook.cpp index c8793d9..4082679 100644 --- a/core/sql/optimizer/HDFSHook.cpp +++ b/core/sql/optimizer/HDFSHook.cpp @@ -895,7 +895,13 @@ NABoolean HHDFSTableStats::populate(struct hive_tbl_desc *htd) // put back fully qualified URI tableDir = hsd->location_; - // visit the directory + // get the fine-resolution timestamp before visiting + // the tree, to avoid losing any updates while this + // method is executing + computeModificationTSmsec(); + + if (diags_.isSuccess()) + // visit the directory processDirectory(tableDir, hsd->buckets_, hsd->isTrulyText(), hsd->getRecordTerminator()); @@ -1149,6 +1155,32 @@ void HHDFSTableStats::disconnectHDFS() // is dropped or the thread exits. } +void HHDFSTableStats::computeModificationTSmsec() +{ + if (modificationTSInMillisec_ <= 0) + { + HDFS_Client_RetCode rc; + + // get a millisecond-resolution timestamp via JNI + rc = HdfsClient::getHiveTableMaxModificationTs( + modificationTSInMillisec_, + tableDir_.data(), + numOfPartCols_); + // check for errors and timestamp mismatches + if (rc != HDFS_CLIENT_OK || modificationTSInMillisec_ <= 0) + { + NAString errMsg; + + errMsg.format("Error %d when reading msec timestamp for HDFS URL %s", + rc, + tableDir_.data()); + diags_.recordError(errMsg, "HHDFSTableStats::computeModificationTSmsec"); + modificationTSInMillisec_ = -1; + } + } + + return; +} OsimHHDFSStatsBase* HHDFSTableStats::osimSnapShot(NAMemory * heap) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/HDFSHook.h ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/HDFSHook.h b/core/sql/optimizer/HDFSHook.h index f6f64fa..1aec212 100644 --- a/core/sql/optimizer/HDFSHook.h +++ b/core/sql/optimizer/HDFSHook.h @@ -319,7 +319,11 @@ public: validationJTimestamp_(-1), listPartitionStatsList_(heap), hiveStatsSize_(0), - heap_(heap) {} + heap_(heap), + type_(UNKNOWN_), + modificationTSInMillisec_(-1) + {} + ~HHDFSTableStats(); const CollIndex entries() const { return listPartitionStatsList_.entries(); } @@ -393,6 +397,12 @@ public: const Lng32 numOfPartCols() const { return numOfPartCols_; } const Lng32 totalNumPartitions() const { return totalNumPartitions_; } + // finer-resolution timestamp for entire table + // (can remove this once we use JNI to collect this info + // for all HDFS files) + Int64 getModificationTSmsec() const { return modificationTSInMillisec_; } + void computeModificationTSmsec(); + private: enum FileType { @@ -443,8 +453,10 @@ private: HHDFSDiags diags_; NAMemory *heap_; - + FileType type_; + + Int64 modificationTSInMillisec_; }; #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/OptimizerSimulator.cpp ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/OptimizerSimulator.cpp b/core/sql/optimizer/OptimizerSimulator.cpp index 53af84d..d611098 100644 --- a/core/sql/optimizer/OptimizerSimulator.cpp +++ b/core/sql/optimizer/OptimizerSimulator.cpp @@ -55,7 +55,6 @@ #include "HBaseClient_JNI.h" #include "vproc.h" -#include "hdfs.h" #include "CmpSeabaseDDL.h" #include "ExExeUtilCli.h" #include "ComUser.h" http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/RelScan.h ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/RelScan.h b/core/sql/optimizer/RelScan.h index 58af7cd..cc36f79 100644 --- a/core/sql/optimizer/RelScan.h +++ b/core/sql/optimizer/RelScan.h @@ -882,7 +882,8 @@ public: char* &hdfsHostName, Int32 &hdfsPort, NABoolean &doMultiCursor, - NABoolean &doSplitFileOpt); + NABoolean &doSplitFileOpt, + NABoolean &isHdfsCompressed); static short genForOrc(Generator * generator, const HHDFSTableStats* hTabStats, const PartitioningFunction * mypart, http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/regress/hive/EXPECTED007 ---------------------------------------------------------------------- diff --git a/core/sql/regress/hive/EXPECTED007 b/core/sql/regress/hive/EXPECTED007 index 2162198..2b0fd34 100644 --- a/core/sql/regress/hive/EXPECTED007 +++ b/core/sql/regress/hive/EXPECTED007 @@ -956,7 +956,7 @@ create database hivesch0078; *** WARNING[8597] Statement was automatically retried 1 time(s). Delay before each retry was 0 seconds. See next entry for the error that caused this retry. -*** WARNING[8436] Mismatch detected between compiletime and runtime hive table definitions. DataModMismatchDetails: compiledModTS = 1508360788, failedModTS = -1, failedLoc = hdfs://localhost:25600/user/hive/warehouse/hivesch007.db/thive1 +*** WARNING[8577] Table, index, or view HIVE.HIVESCH007.THIVE1 was not found. --- 0 row(s) selected. >> http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/regress/hive/EXPECTED018 ---------------------------------------------------------------------- diff --git a/core/sql/regress/hive/EXPECTED018 b/core/sql/regress/hive/EXPECTED018 index d693a56..6f100f1 100644 --- a/core/sql/regress/hive/EXPECTED018 +++ b/core/sql/regress/hive/EXPECTED018 @@ -2376,7 +2376,7 @@ C_FIRST_NAME Task: UNLOAD Status: Started Task: EXTRACT Status: Started Time: 2018-02-15 18:12:42.125129 -*** ERROR[8447] An error occurred during hdfs access. Error Detail: Java exception in hdfsCreate(). java.io.IOException: No FileSystem for scheme: null +*** ERROR[8447] An error occurred during hdfs access. Error Detail: Java exception in HdfsClient::hdfsOpen(). java.io.IOException: No FileSystem for scheme: null org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/regress/hive/EXPECTED040 ---------------------------------------------------------------------- diff --git a/core/sql/regress/hive/EXPECTED040 b/core/sql/regress/hive/EXPECTED040 index 8fe3212..bb75200 100644 --- a/core/sql/regress/hive/EXPECTED040 +++ b/core/sql/regress/hive/EXPECTED040 @@ -252,7 +252,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 532 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 576 convertSkipListSize_ = 33, convertSkipList_ = 3 outputRowLength_ = 16 -Flag = 0xc +Flag = 0x20c Number of ranges to scan: 1 Number of esps to scan: 1 @@ -390,7 +390,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 8 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 16 convertSkipListSize_ = 33, convertSkipList_ = 3 outputRowLength_ = 16 -Flag = 0xc +Flag = 0x20c Number of ranges to scan: 1 Number of esps to scan: 1 @@ -546,7 +546,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 8 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 8 convertSkipListSize_ = 33, convertSkipList_ = 2 outputRowLength_ = 8 -Flag = 0xc +Flag = 0x20c Number of ranges to scan: 1 Number of esps to scan: 1 @@ -682,7 +682,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 8 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 8 convertSkipListSize_ = 33, convertSkipList_ = 2 outputRowLength_ = 8 -Flag = 0xc +Flag = 0x20c Number of ranges to scan: 1 Number of esps to scan: 1 http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 52453cc..ff78d3d 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -22,17 +22,21 @@ package org.trafodion.sql; import java.io.IOException; +import java.io.FileNotFoundException; import java.io.EOFException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.Logger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.FileStatus; import java.io.EOFException; import java.util.concurrent.Callable; @@ -41,6 +45,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -49,6 +54,17 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.util.ReflectionUtils; +// +// To read a range in a Hdfs file, use the constructor +// public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException +// +// For instance methods like hdfsListDirectory use the constructor +// public HDFSClient() +// +// For all static methods use +// HdfsClient::<static_method_name> +// + public class HDFSClient { static Logger logger_ = Logger.getLogger(HDFSClient.class.getName()); @@ -71,6 +87,9 @@ public class HDFSClient private int bytesRead_; private Future future_ = null; private int isEOF_ = 0; + private int totalBytesWritten_ = 0; + private Path filepath_ = null; + private boolean compression_; static { String confFile = System.getProperty("trafodion.log4j.configFile"); System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); @@ -89,6 +108,13 @@ public class HDFSClient System.loadLibrary("executor"); } + // The object instance that runs in the threadpool to read + // the requested chunk in the range + + // FSDataInputStream.read method may not read the requested length in one shot + // Loop to read the requested length or EOF is reached + // Requested length can never be larger than the buffer size + class HDFSRead implements Callable { HDFSRead() @@ -136,15 +162,19 @@ public class HDFSClient // This constructor enables the hdfs data to be read in another thread while the previously // read buffer is being processed by the SQL engine + // Opens the file and hands over the needed info to HdfsRead instance to read + // The passed in length can never be more than the size of the buffer + // If the range has a length more than the buffer length, the range is chunked + // in HdfsScan public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException { bufNo_ = bufNo; rangeNo_ = rangeNo; filename_ = filename; - Path filepath = new Path(filename_); - fs_ = FileSystem.get(filepath.toUri(),config_); - fsdis_ = fs_.open(filepath); - blockSize_ = (int)fs_.getDefaultBlockSize(filepath); + filepath_ = new Path(filename_); + fs_ = FileSystem.get(filepath_.toUri(),config_); + fsdis_ = fs_.open(filepath_); + blockSize_ = (int)fs_.getDefaultBlockSize(filepath_); buf_ = buffer; bufOffset_ = 0; pos_ = position; @@ -161,6 +191,10 @@ public class HDFSClient } } + // This method waits for the read to complete. Read can complete due to one of the following + // a) buffer is full + // b) EOF is reached + // c) An exception is encountered while reading the file public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException { Integer retObject = 0; @@ -168,6 +202,7 @@ public class HDFSClient retObject = (Integer)future_.get(); bytesRead = retObject.intValue(); fsdis_.close(); + fsdis_ = null; return bytesRead; } @@ -181,78 +216,127 @@ public class HDFSClient return isEOF_; } - boolean hdfsCreate(String fname , boolean compress) throws IOException + boolean hdfsCreate(String fname , boolean overwrite, boolean compress) throws IOException { - if (logger_.isDebugEnabled()) + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsCreate() - started" ); - Path filePath = null; if (!compress || (compress && fname.endsWith(".gz"))) - filePath = new Path(fname); + filepath_ = new Path(fname); else - filePath = new Path(fname + ".gz"); + filepath_ = new Path(fname + ".gz"); - FileSystem fs = FileSystem.get(filePath.toUri(),config_); - FSDataOutputStream fsOut = fs.create(filePath, true); - - if (compress) { - GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); - Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); - outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); + fs_ = FileSystem.get(filepath_.toUri(),config_); + compression_ = compress; + fsdis_ = null; + FSDataOutputStream fsOut; + if (overwrite) + fsOut = fs_.create(filepath_); + else + if (fs_.exists(filepath_)) + fsOut = fs_.append(filepath_); + else + fsOut = fs_.create(filepath_); + + if (compression_) { + GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); + Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); + outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); } else - outStream_ = fsOut; - if (logger_.isDebugEnabled()) - logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" ); + outStream_ = fsOut; return true; - } + } - boolean hdfsOpen(String fname , boolean compress) throws IOException - { + boolean hdfsOpen(String fname , boolean compress) throws IOException + { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsOpen() - started" ); - Path filePath = null; if (!compress || (compress && fname.endsWith(".gz"))) - filePath = new Path(fname); + filepath_ = new Path(fname); else - filePath = new Path(fname + ".gz"); - - FileSystem fs = FileSystem.get(filePath.toUri(),config_); + filepath_ = new Path(fname + ".gz"); + fs_ = FileSystem.get(filepath_.toUri(),config_); + compression_ = compress; + outStream_ = null; + fsdis_ = null; + return true; + } + + int hdfsWrite(byte[] buff) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsWrite() - started" ); + FSDataOutputStream fsOut; - if (fs.exists(filePath)) - fsOut = fs.append(filePath); - else - fsOut = fs.create(filePath); + if (outStream_ == null) { + if (fs_.exists(filepath_)) + fsOut = fs_.append(filepath_); + else + fsOut = fs_.create(filepath_); - if (compress) { - GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); - Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); - outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); + if (compression_) { + GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); + Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); + outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); + } + else + outStream_ = fsOut; + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsWrite() - output stream created" ); } + outStream_.write(buff); + if (outStream_ instanceof FSDataOutputStream) + totalBytesWritten_ = ((FSDataOutputStream)outStream_).size(); else - outStream_ = fsOut; + totalBytesWritten_ += buff.length; if (logger_.isDebugEnabled()) - logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" ); - return true; + logger_.debug("HDFSClient.hdfsWrite() - bytes written " + totalBytesWritten_ ); + return totalBytesWritten_; } - - boolean hdfsWrite(byte[] buff, long len) throws IOException - { + int hdfsRead(ByteBuffer buffer) throws IOException + { if (logger_.isDebugEnabled()) - logger_.debug("HDFSClient.hdfsWrite() - started" ); - outStream_.write(buff); - outStream_.flush(); - if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - bytes written and flushed:" + len ); - return true; + logger_.debug("HDFSClient.hdfsRead() - started" ); + if (fsdis_ == null) { + fsdis_ = fs_.open(filepath_); + pos_ = 0; + } + int lenRemain; + int bytesRead; + int totalBytesRead = 0; + int bufLen; + int bufOffset = 0; + if (buffer.hasArray()) + bufLen = buffer.array().length; + else + bufLen = buffer.capacity(); + lenRemain = bufLen; + do + { + if (buffer.hasArray()) + bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain); + else + bytesRead = fsdis_.read(buffer); + if (bytesRead == -1 || bytesRead == 0) + break; + totalBytesRead += bytesRead; + pos_ += bytesRead; + lenRemain -= bytesRead; + } while (lenRemain > 0); + return totalBytesRead; } boolean hdfsClose() throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" ); if (outStream_ != null) { + outStream_.flush(); outStream_.close(); outStream_ = null; } + if (fsdis_ != null) + fsdis_.close(); return true; } @@ -380,6 +464,25 @@ public class HDFSClient else return 0; } + + + public void stop() throws IOException + { + if (future_ != null) { + try { + future_.get(200, TimeUnit.MILLISECONDS); + } catch(TimeoutException e) { + logger_.error("Asynchronous Thread of HdfsScan is Cancelled (timeout), ", e); + future_.cancel(true); + } catch(InterruptedException e) { + logger_.error("Asynchronous Thread of HdfsScan is Cancelled (interrupt), ", e); + future_.cancel(true); // Interrupt the thread + } catch (ExecutionException ee) + { + } + future_ = null; + } + } public static void shutdown() throws InterruptedException { @@ -387,6 +490,103 @@ public class HDFSClient executorService_.shutdown(); } + private static FileSystem getFileSystem() throws IOException + { + return defaultFs_; + } + + // if levelDeep = 0, return the max modification timestamp of the passed-in HDFS URIs + // (a tab-separated list of 0 or more paths) + // if levelDeep > 0, also check all directories "levelDeep" levels below. Exclude + // directories that start with a dot (hidden directories) + public static long getHiveTableMaxModificationTs( String stableDirPaths, int levelDeep) throws FileNotFoundException, IOException + { + long result = 0; + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient:getHiveTableMaxModificationTs enter"); + + String[] tableDirPaths = stableDirPaths.split("\t"); + // account for root dir + for (int i=0; i<tableDirPaths.length; i++) { + FileStatus r = getFileSystem().getFileStatus(new Path(tableDirPaths[i]));// super fast API, return in .2ms + if (r != null && r.getModificationTime() > result) + result = r.getModificationTime(); + } + + if (levelDeep>0) + { + Path[] paths = new Path[tableDirPaths.length]; + for (int i=0; i<tableDirPaths.length; i++) + paths[i] = new Path(tableDirPaths[i]); + long l = getHiveTableMaxModificationTs2(paths,levelDeep); + if (l > result) + result = l; + } + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient:getHiveTableMaxModificationTs "+stableDirPaths+" levelDeep"+levelDeep+":"+result); + return result; + } + + private static long getHiveTableMaxModificationTs2(Path[] paths, int levelDeep)throws FileNotFoundException, IOException + { + long result = 0; + PathFilter filter = new PathFilter(){ + public boolean accept(Path file){ + return !file.getName().startsWith(".");//filter out hidden files and directories + } + }; + FileStatus[] fileStatuss=null; + if (levelDeep == 1){ // stop condition on recursive function + //check parent level (important for deletes): + for (Path path : paths){ + FileStatus r = getFileSystem().getFileStatus(path);// super fast API, return in .2ms + if (r != null && r.getModificationTime()>result) + result = r.getModificationTime(); + } + if (paths.length==1) + fileStatuss = getFileSystem().listStatus(paths[0],filter);// minor optimization. avoid using list based API when not needed + else + fileStatuss = getFileSystem().listStatus(paths,filter); + for(int i=0;i<fileStatuss.length;i++) + if (fileStatuss[i].isDirectory() && fileStatuss[i].getModificationTime()>result) + result = fileStatuss[i].getModificationTime(); + }else{//here levelDeep >1 + List<Path> pathList = new ArrayList<Path>(); + if (paths.length==1) + fileStatuss = getFileSystem().listStatus(paths[0],filter);// minor optimization. avoid using list based API when not needed + else + fileStatuss = getFileSystem().listStatus(paths,filter); + for(int i=0;i<fileStatuss.length;i++) + if (fileStatuss[i].isDirectory()) + { + pathList.add(fileStatuss[i].getPath()); + if (fileStatuss[i].getModificationTime()>result) + result = fileStatuss[i].getModificationTime();// make sure level n-1 is accounted for for delete partition case + } + long l = getHiveTableMaxModificationTs2(pathList.toArray(new Path[pathList.size()]),levelDeep-1); + if (l>result) result = l; + + } + return result; + } + + public static String getFsDefaultName() + { + String uri = config_.get("fs.defaultFS"); + return uri; + } + + + public static boolean hdfsCreateDirectory(String pathStr) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCreateDirectory()" + pathStr); + Path dirPath = new Path(pathStr ); + FileSystem fs = FileSystem.get(dirPath.toUri(), config_); + fs.mkdirs(dirPath); + return true; + } + private native int sendFileStatus(long jniObj, int numFiles, int fileNo, boolean isDir, String filename, long modTime, long len, short numReplicas, long blockSize, String owner, String group, http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index e216555..99f021d 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -21,17 +21,6 @@ package org.trafodion.sql; -// This class implements an efficient mechanism to read hdfs files -// Trafodion ExHdfsScan operator provides a range of scans to be performed. -// The range consists of a hdfs filename, offset and length to be read -// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers -// backed up native buffers or indirect buffer backed by java arrays. -// All the ranges are read alternating between the two buffers using ExecutorService -// using CachedThreadPool mechanism. -// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read -// the next full or partial buffer while the main thread processes the previously -// read information from the other buffer - import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.Logger; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +43,24 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.fs.FileStatus; import java.net.URI; +// This class implements an efficient mechanism to read hdfs files +// Trafodion ExHdfsScan operator provides a range of scans to be performed. +// The range consists of a hdfs filename, offset and length to be read +// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers +// backed up native buffers or indirect buffer backed by java arrays. +// All the ranges are read alternating between the two buffers using ExecutorService +// using CachedThreadPool mechanism. + +// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read +// the next full or partial buffer while the main thread processes the previously +// read information from the other buffer +// HdfsScan picks up a range and schedules a read into a next available buffer. +// If the range is more than the buffer size, then the range is read into multiple +// chunks and schedules one chunk at a time alternatiing the buffers +// Once the range is completed, the next range in the HdfsScanRange is picked up +// for scheduling, till all the ranges assigned to the HdfsScan instance are read. + + public class HdfsScan { static Logger logger_ = Logger.getLogger(HdfsScan.class.getName()); @@ -61,10 +68,13 @@ public class HdfsScan private int bufLen_[]; private HDFSClient hdfsClient_[]; private int currRange_; - private long currPos_; - private long lenRemain_; + private long currRangePos_; + private long currRangeLenRemain_; private int lastBufCompleted_ = -1; private boolean scanCompleted_; + + // Structure to hold the Scan ranges for this HdfsScan instance + // class HdfsScanRange { @@ -95,6 +105,7 @@ public class HdfsScan public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException { + // Two buffers to hold the data read buf_ = new ByteBuffer[2]; bufLen_ = new int[2]; @@ -114,39 +125,47 @@ public class HdfsScan } if (hdfsScanRanges_.length > 0) { currRange_ = 0; - currPos_ = hdfsScanRanges_[currRange_].pos_; - lenRemain_ = hdfsScanRanges_[currRange_].len_; - hdfsScanRange(0, 0); + currRangePos_ = hdfsScanRanges_[currRange_].pos_; + currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; + scheduleHdfsScanRange(0, 0); } scanCompleted_ = false; } - public void hdfsScanRange(int bufNo, int bytesCompleted) throws IOException + public void scheduleHdfsScanRange(int bufNo, int bytesCompleted) throws IOException { - lenRemain_ -= bytesCompleted; - currPos_ += bytesCompleted; + currRangeLenRemain_ -= bytesCompleted; + currRangePos_ += bytesCompleted; int readLength; - if (lenRemain_ <= 0) { + if (currRangeLenRemain_ <= 0) { if (currRange_ == (hdfsScanRanges_.length-1)) { scanCompleted_ = true; return; } else { currRange_++; - currPos_ = hdfsScanRanges_[currRange_].pos_; - lenRemain_ = hdfsScanRanges_[currRange_].len_; + currRangePos_ = hdfsScanRanges_[currRange_].pos_; + currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; } } - if (lenRemain_ > bufLen_[bufNo]) + if (currRangeLenRemain_ > bufLen_[bufNo]) readLength = bufLen_[bufNo]; else - readLength = (int)lenRemain_; + readLength = (int)currRangeLenRemain_; if (! scanCompleted_) { if (logger_.isDebugEnabled()) - logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); - hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].tdbRangeNum_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength); + logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + currRangeLenRemain_ + " BufNo " + bufNo); + hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].tdbRangeNum_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength); } } + +/* + Method to wait for completion of the scheduled read of a chunk in a range + Returns 4 items, bytes read, buf no, range no, is EOF + If there are more chunks to be read in the range, schedules a read into the other buffer + If EOF is reached or the full range is read, the next range is picked up for + scheduling +*/ public int[] trafHdfsRead() throws IOException, InterruptedException, ExecutionException { @@ -164,12 +183,14 @@ public class HdfsScan switch (lastBufCompleted_) { case -1: case 1: + // Wait for the read to complete in buffer 0 bytesRead = hdfsClient_[0].trafHdfsReadBuffer(); bufNo = 0; rangeNo = hdfsClient_[0].getRangeNo(); isEOF = hdfsClient_[0].isEOF(); break; case 0: + // Wait for the read to complete in buffer 1 bytesRead = hdfsClient_[1].trafHdfsReadBuffer(); bufNo = 1; rangeNo = hdfsClient_[1].getRangeNo(); @@ -194,18 +215,20 @@ public class HdfsScan return retArray; } else { currRange_++; - currPos_ = hdfsScanRanges_[currRange_].pos_; - lenRemain_ = hdfsScanRanges_[currRange_].len_; + currRangePos_ = hdfsScanRanges_[currRange_].pos_; + currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; bytesRead = 0; } } switch (lastBufCompleted_) { case 0: - hdfsScanRange(1, bytesRead); + // schedule the next chunk or next range to be read in buffer 1 + scheduleHdfsScanRange(1, bytesRead); break; case 1: - hdfsScanRange(0, bytesRead); + // schedule the next chunk or next range to be read in buffer 0 + scheduleHdfsScanRange(0, bytesRead); break; default: break; @@ -213,10 +236,22 @@ public class HdfsScan return retArray; } + public void stop() throws IOException + { + if (hdfsClient_[0] != null) + hdfsClient_[0].stop(); + if (hdfsClient_[1] != null) + hdfsClient_[1].stop(); + hdfsClient_[0] = null; + hdfsClient_[1] = null; + return; + } + public static void shutdown() throws InterruptedException { HDFSClient.shutdown(); } + public static void main(String[] args) throws Exception {
