http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/nskgmake/tdm_sqlexp/Makefile
----------------------------------------------------------------------
diff --git a/core/sql/nskgmake/tdm_sqlexp/Makefile 
b/core/sql/nskgmake/tdm_sqlexp/Makefile
index 6b973af..7213449 100755
--- a/core/sql/nskgmake/tdm_sqlexp/Makefile
+++ b/core/sql/nskgmake/tdm_sqlexp/Makefile
@@ -77,7 +77,7 @@ CPPSRC += exp_ieee.cpp
 
 
 CPPSRC += vers_libtdm_sqlexp.cpp
-DEFS := -D_IEEE_FLOAT -DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H 
-D__STDC_CONSTANT_MACROS -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS
+DEFS := -D_IEEE_FLOAT -DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H 
-D__STDC_CONSTANT_MACROS 
 
 #
 LLVM_OBJ_DIR := $(LLVM)/$(SQ_BUILD_TYPE)/lib

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/HDFSHook.cpp
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/HDFSHook.cpp b/core/sql/optimizer/HDFSHook.cpp
index c8793d9..4082679 100644
--- a/core/sql/optimizer/HDFSHook.cpp
+++ b/core/sql/optimizer/HDFSHook.cpp
@@ -895,7 +895,13 @@ NABoolean HHDFSTableStats::populate(struct hive_tbl_desc 
*htd)
       // put back fully qualified URI
       tableDir = hsd->location_;
 
-      // visit the directory
+      // get the fine-resolution timestamp before visiting
+      // the tree, to avoid losing any updates while this
+      // method is executing
+      computeModificationTSmsec();
+
+      if (diags_.isSuccess())
+        // visit the directory
       processDirectory(tableDir, hsd->buckets_, 
                        hsd->isTrulyText(), 
                        hsd->getRecordTerminator());
@@ -1149,6 +1155,32 @@ void HHDFSTableStats::disconnectHDFS()
   // is dropped or the thread exits.
 }
 
+void HHDFSTableStats::computeModificationTSmsec()
+{
+  if (modificationTSInMillisec_ <= 0)
+    {
+      HDFS_Client_RetCode rc;
+
+      // get a millisecond-resolution timestamp via JNI
+      rc = HdfsClient::getHiveTableMaxModificationTs(
+               modificationTSInMillisec_,
+               tableDir_.data(),
+               numOfPartCols_);
+      // check for errors and timestamp mismatches
+      if (rc != HDFS_CLIENT_OK || modificationTSInMillisec_ <= 0)
+        {
+          NAString errMsg;
+
+          errMsg.format("Error %d when reading msec timestamp for HDFS URL %s",
+                        rc,
+                        tableDir_.data());
+          diags_.recordError(errMsg, 
"HHDFSTableStats::computeModificationTSmsec");
+          modificationTSInMillisec_ = -1;
+        }
+    }
+
+  return;
+}
 
 OsimHHDFSStatsBase* HHDFSTableStats::osimSnapShot(NAMemory * heap)
 {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/HDFSHook.h
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/HDFSHook.h b/core/sql/optimizer/HDFSHook.h
index f6f64fa..1aec212 100644
--- a/core/sql/optimizer/HDFSHook.h
+++ b/core/sql/optimizer/HDFSHook.h
@@ -319,7 +319,11 @@ public:
                                     validationJTimestamp_(-1),
                                     listPartitionStatsList_(heap),
                                     hiveStatsSize_(0),
-                                    heap_(heap) {}
+                                    heap_(heap),
+                                    type_(UNKNOWN_),
+                                    modificationTSInMillisec_(-1)
+  {}
+
   ~HHDFSTableStats();
 
   const CollIndex entries() const          { return 
listPartitionStatsList_.entries(); }
@@ -393,6 +397,12 @@ public:
   const Lng32 numOfPartCols() const { return numOfPartCols_; }
   const Lng32 totalNumPartitions() const { return totalNumPartitions_; }
 
+  // finer-resolution timestamp for entire table
+  // (can remove this once we use JNI to collect this info
+  // for all HDFS files)
+  Int64 getModificationTSmsec() const { return modificationTSInMillisec_; }
+  void computeModificationTSmsec();
+
 private:
   enum FileType
   {
@@ -443,8 +453,10 @@ private:
   HHDFSDiags diags_;
 
   NAMemory *heap_;
-
+ 
   FileType type_;
+
+  Int64 modificationTSInMillisec_;
 };
 
 #endif

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/OptimizerSimulator.cpp
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/OptimizerSimulator.cpp 
b/core/sql/optimizer/OptimizerSimulator.cpp
index 53af84d..d611098 100644
--- a/core/sql/optimizer/OptimizerSimulator.cpp
+++ b/core/sql/optimizer/OptimizerSimulator.cpp
@@ -55,7 +55,6 @@
 #include "HBaseClient_JNI.h"
 
 #include "vproc.h"
-#include "hdfs.h"
 #include "CmpSeabaseDDL.h"
 #include "ExExeUtilCli.h"
 #include "ComUser.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/optimizer/RelScan.h
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/RelScan.h b/core/sql/optimizer/RelScan.h
index 58af7cd..cc36f79 100644
--- a/core/sql/optimizer/RelScan.h
+++ b/core/sql/optimizer/RelScan.h
@@ -882,7 +882,8 @@ public:
                              char* &hdfsHostName,
                              Int32 &hdfsPort,
                              NABoolean &doMultiCursor,
-                             NABoolean &doSplitFileOpt);
+                             NABoolean &doSplitFileOpt,
+                             NABoolean &isHdfsCompressed);
   static short genForOrc(Generator * generator,
                          const HHDFSTableStats* hTabStats,
                          const PartitioningFunction * mypart,

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/regress/hive/EXPECTED007
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/EXPECTED007 
b/core/sql/regress/hive/EXPECTED007
index 2162198..2b0fd34 100644
--- a/core/sql/regress/hive/EXPECTED007
+++ b/core/sql/regress/hive/EXPECTED007
@@ -956,7 +956,7 @@ create database hivesch0078;
 
 *** WARNING[8597] Statement was automatically retried 1 time(s). Delay before 
each retry was 0 seconds. See next entry for the error that caused this retry.
 
-*** WARNING[8436] Mismatch detected between compiletime and runtime hive table 
definitions. DataModMismatchDetails: compiledModTS = 1508360788, failedModTS = 
-1, failedLoc = hdfs://localhost:25600/user/hive/warehouse/hivesch007.db/thive1
+*** WARNING[8577] Table, index, or view HIVE.HIVESCH007.THIVE1 was not found.
 
 --- 0 row(s) selected.
 >>

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/regress/hive/EXPECTED018
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/EXPECTED018 
b/core/sql/regress/hive/EXPECTED018
index d693a56..6f100f1 100644
--- a/core/sql/regress/hive/EXPECTED018
+++ b/core/sql/regress/hive/EXPECTED018
@@ -2376,7 +2376,7 @@ C_FIRST_NAME
 Task: UNLOAD           Status: Started
 Task:  EXTRACT         Status: Started    Time: 2018-02-15 18:12:42.125129
 
-*** ERROR[8447] An error occurred during hdfs access. Error Detail: Java 
exception in hdfsCreate(). java.io.IOException: No FileSystem for scheme: null
+*** ERROR[8447] An error occurred during hdfs access. Error Detail: Java 
exception in HdfsClient::hdfsOpen(). java.io.IOException: No FileSystem for 
scheme: null
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/regress/hive/EXPECTED040
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/EXPECTED040 
b/core/sql/regress/hive/EXPECTED040
index 8fe3212..bb75200 100644
--- a/core/sql/regress/hive/EXPECTED040
+++ b/core/sql/regress/hive/EXPECTED040
@@ -252,7 +252,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 532
 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 576
 convertSkipListSize_ = 33, convertSkipList_ = 3
 outputRowLength_ = 16
-Flag = 0xc
+Flag = 0x20c
 
 Number of ranges to scan: 1
 Number of esps to scan:    1
@@ -390,7 +390,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 8
 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 16
 convertSkipListSize_ = 33, convertSkipList_ = 3
 outputRowLength_ = 16
-Flag = 0xc
+Flag = 0x20c
 
 Number of ranges to scan: 1
 Number of esps to scan:    1
@@ -546,7 +546,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 8
 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 8
 convertSkipListSize_ = 33, convertSkipList_ = 2
 outputRowLength_ = 8
-Flag = 0xc
+Flag = 0x20c
 
 Number of ranges to scan: 1
 Number of esps to scan:    1
@@ -682,7 +682,7 @@ asciiTuppIndex_ = 4, asciiRowLen_ = 8
 moveExprColsTuppIndex_ = 2, moveExprColsRowLength_ = 8
 convertSkipListSize_ = 33, convertSkipList_ = 2
 outputRowLength_ = 8
-Flag = 0xc
+Flag = 0x20c
 
 Number of ranges to scan: 1
 Number of esps to scan:    1

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 52453cc..ff78d3d 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -22,17 +22,21 @@
 package org.trafodion.sql;
 
 import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.io.EOFException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.Logger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FileStatus;
 import java.io.EOFException;
 import java.util.concurrent.Callable;
@@ -41,6 +45,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -49,6 +54,17 @@ import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.util.ReflectionUtils;
 
+//
+//  To read a range in a Hdfs file, use the constructor
+//   public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer 
buffer, long position, int length) throws IOException
+// 
+//  For instance methods like hdfsListDirectory use the constructor
+//     public HDFSClient()
+//
+//  For all static methods use
+//     HdfsClient::<static_method_name>
+//
+
 public class HDFSClient 
 {
    static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
@@ -71,6 +87,9 @@ public class HDFSClient
    private int bytesRead_;
    private Future future_ = null;
    private int isEOF_ = 0; 
+   private int totalBytesWritten_ = 0;
+   private Path filepath_ = null;
+   private boolean compression_;
    static {
       String confFile = System.getProperty("trafodion.log4j.configFile");
       System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -89,6 +108,13 @@ public class HDFSClient
       System.loadLibrary("executor");
    }
 
+   // The object instance that runs in the threadpool to read
+   // the requested chunk in the range
+
+   // FSDataInputStream.read method may not read the requested length in one 
shot
+   // Loop to read the requested length or EOF is reached 
+   // Requested length can never be larger than the buffer size
+
    class HDFSRead implements Callable 
    {
       HDFSRead() 
@@ -136,15 +162,19 @@ public class HDFSClient
 
    // This constructor enables the hdfs data to be read in another thread 
while the previously 
    // read buffer is being processed by the SQL engine 
+   // Opens the file and hands over the needed info to HdfsRead instance to 
read 
+   // The passed in length can never be more than the size of the buffer
+   // If the range has a length more than the buffer length, the range is 
chunked
+   // in HdfsScan
    public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer 
buffer, long position, int length) throws IOException
    {
       bufNo_ = bufNo; 
       rangeNo_ = rangeNo;
       filename_ = filename;
-      Path filepath = new Path(filename_);
-      fs_ = FileSystem.get(filepath.toUri(),config_);
-      fsdis_ = fs_.open(filepath);
-      blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
+      filepath_ = new Path(filename_);
+      fs_ = FileSystem.get(filepath_.toUri(),config_);
+      fsdis_ = fs_.open(filepath_);
+      blockSize_ = (int)fs_.getDefaultBlockSize(filepath_);
       buf_  = buffer;
       bufOffset_ = 0;
       pos_ = position;
@@ -161,6 +191,10 @@ public class HDFSClient
       }
    }
 
+  //  This method waits for the read to complete. Read can complete due to one 
of the following
+  //  a) buffer is full
+  //  b) EOF is reached
+  //  c) An exception is encountered while reading the file
    public int trafHdfsReadBuffer() throws IOException, InterruptedException, 
ExecutionException
    {
       Integer retObject = 0;
@@ -168,6 +202,7 @@ public class HDFSClient
       retObject = (Integer)future_.get();
       bytesRead = retObject.intValue();
       fsdis_.close();
+      fsdis_ = null;
       return bytesRead;
    }  
 
@@ -181,78 +216,127 @@ public class HDFSClient
       return isEOF_;
    }
 
-   boolean hdfsCreate(String fname , boolean compress) throws IOException
+   boolean hdfsCreate(String fname , boolean overwrite, boolean compress) 
throws IOException
    {
-     if (logger_.isDebugEnabled()) 
+      if (logger_.isDebugEnabled()) 
         logger_.debug("HDFSClient.hdfsCreate() - started" );
-      Path filePath = null;
       if (!compress || (compress && fname.endsWith(".gz")))
-        filePath = new Path(fname);
+        filepath_ = new Path(fname);
       else
-        filePath = new Path(fname + ".gz");
+        filepath_ = new Path(fname + ".gz");
         
-      FileSystem fs = FileSystem.get(filePath.toUri(),config_);
-      FSDataOutputStream fsOut = fs.create(filePath, true);
-      
-      if (compress) {
-        GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( 
GzipCodec.class, config_);
-        Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
-        outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
+      fs_ = FileSystem.get(filepath_.toUri(),config_);
+      compression_ = compress;
+      fsdis_ = null;      
+      FSDataOutputStream fsOut;
+      if (overwrite)
+         fsOut = fs_.create(filepath_);
+      else
+      if (fs_.exists(filepath_))
+         fsOut = fs_.append(filepath_);
+      else
+         fsOut = fs_.create(filepath_);
+
+      if (compression_) {
+          GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( 
GzipCodec.class, config_);
+          Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
+          outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
       }
       else
-        outStream_ = fsOut;      
-      if (logger_.isDebugEnabled()) 
-         logger_.debug("HDFSClient.hdfsCreate() - compressed output stream 
created" );
+         outStream_ = fsOut;
       return true;
-    }
+   }
 
-    boolean hdfsOpen(String fname , boolean compress) throws IOException
-    {
+   boolean hdfsOpen(String fname , boolean compress) throws IOException
+   {
       if (logger_.isDebugEnabled()) 
          logger_.debug("HDFSClient.hdfsOpen() - started" );
-      Path filePath = null;
       if (!compress || (compress && fname.endsWith(".gz")))
-        filePath = new Path(fname);
+        filepath_ = new Path(fname);
       else
-        filePath = new Path(fname + ".gz");
-        
-      FileSystem fs = FileSystem.get(filePath.toUri(),config_);
+        filepath_ = new Path(fname + ".gz");
+      fs_ = FileSystem.get(filepath_.toUri(),config_);
+      compression_ = compress;  
+      outStream_ = null;
+      fsdis_ = null;      
+      return true;
+    }
+    
+    int hdfsWrite(byte[] buff) throws IOException
+    {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsWrite() - started" );
+
       FSDataOutputStream fsOut;
-      if (fs.exists(filePath))
-         fsOut = fs.append(filePath);
-      else
-         fsOut = fs.create(filePath);
+      if (outStream_ == null) {
+         if (fs_.exists(filepath_))
+            fsOut = fs_.append(filepath_);
+         else
+            fsOut = fs_.create(filepath_);
       
-      if (compress) {
-        GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( 
GzipCodec.class, config_);
-        Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
-        outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
+         if (compression_) {
+            GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( 
GzipCodec.class, config_);
+            Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
+            outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
+         }
+         else
+            outStream_ = fsOut;      
+         if (logger_.isDebugEnabled()) 
+            logger_.debug("HDFSClient.hdfsWrite() - output stream created" );
       }
+      outStream_.write(buff);
+      if (outStream_ instanceof FSDataOutputStream)
+         totalBytesWritten_ = ((FSDataOutputStream)outStream_).size();
       else
-        outStream_ = fsOut;      
+         totalBytesWritten_ += buff.length; 
       if (logger_.isDebugEnabled()) 
-         logger_.debug("HDFSClient.hdfsCreate() - compressed output stream 
created" );
-      return true;
+         logger_.debug("HDFSClient.hdfsWrite() - bytes written " + 
totalBytesWritten_ );
+      return totalBytesWritten_;
     }
-    
-    boolean hdfsWrite(byte[] buff, long len) throws IOException
-    {
 
+    int hdfsRead(ByteBuffer buffer) throws IOException
+    {
       if (logger_.isDebugEnabled()) 
-         logger_.debug("HDFSClient.hdfsWrite() - started" );
-      outStream_.write(buff);
-      outStream_.flush();
-      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - 
bytes written and flushed:" + len  );
-      return true;
+         logger_.debug("HDFSClient.hdfsRead() - started" );
+      if (fsdis_ == null) {
+         fsdis_ = fs_.open(filepath_);
+         pos_ = 0;
+      }
+      int lenRemain;   
+      int bytesRead;
+      int totalBytesRead = 0;
+      int bufLen;
+      int bufOffset = 0;
+      if (buffer.hasArray())
+         bufLen = buffer.array().length;
+      else
+         bufLen = buffer.capacity();
+      lenRemain = bufLen;
+      do
+      {
+         if (buffer.hasArray())
+            bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, 
lenRemain);
+         else
+            bytesRead = fsdis_.read(buffer);    
+         if (bytesRead == -1 || bytesRead == 0)
+            break;    
+         totalBytesRead += bytesRead;
+         pos_ += bytesRead;
+         lenRemain -= bytesRead;
+      } while (lenRemain > 0);
+      return totalBytesRead;
     }
     
     boolean hdfsClose() throws IOException
     {
       if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - 
started" );
       if (outStream_ != null) {
+          outStream_.flush();
           outStream_.close();
           outStream_ = null;
       }
+      if (fsdis_ != null)
+         fsdis_.close();
       return true;
     }
 
@@ -380,6 +464,25 @@ public class HDFSClient
       else  
          return 0;
    }
+
+
+   public void stop() throws IOException
+   {
+      if (future_ != null) {
+         try {
+           future_.get(200, TimeUnit.MILLISECONDS);
+         } catch(TimeoutException e) {
+            logger_.error("Asynchronous Thread of HdfsScan is Cancelled 
(timeout), ", e);
+            future_.cancel(true);
+        } catch(InterruptedException e) {
+            logger_.error("Asynchronous Thread of HdfsScan is Cancelled 
(interrupt), ", e);
+            future_.cancel(true); // Interrupt the thread
+        } catch (ExecutionException ee)
+        {
+        }
+        future_ = null;
+      }
+   }
  
    public static void shutdown() throws InterruptedException
    {
@@ -387,6 +490,103 @@ public class HDFSClient
       executorService_.shutdown();
    }
    
+   private static FileSystem getFileSystem() throws IOException
+   {
+       return defaultFs_;
+   }
+
+   // if levelDeep = 0, return the max modification timestamp of the passed-in 
HDFS URIs
+   // (a tab-separated list of 0 or more paths)
+   // if levelDeep > 0, also check all directories "levelDeep" levels below. 
Exclude
+   // directories that start with a dot (hidden directories)
+   public static long getHiveTableMaxModificationTs( String stableDirPaths, 
int levelDeep) throws FileNotFoundException, IOException
+   {
+       long result = 0;
+       if (logger_.isDebugEnabled())
+          logger_.debug("HDFSClient:getHiveTableMaxModificationTs enter");
+
+       String[] tableDirPaths = stableDirPaths.split("\t");
+       // account for root dir
+       for (int i=0; i<tableDirPaths.length; i++) {
+           FileStatus r = getFileSystem().getFileStatus(new 
Path(tableDirPaths[i]));// super fast API, return in .2ms
+           if (r != null && r.getModificationTime() > result)
+               result = r.getModificationTime();
+       }
+
+       if (levelDeep>0)
+       {
+           Path[] paths = new Path[tableDirPaths.length];
+           for (int i=0; i<tableDirPaths.length; i++)
+               paths[i] = new Path(tableDirPaths[i]);
+           long l = getHiveTableMaxModificationTs2(paths,levelDeep);
+           if (l > result)
+              result = l;
+       }
+       if (logger_.isDebugEnabled())
+           logger_.debug("HDFSClient:getHiveTableMaxModificationTs 
"+stableDirPaths+" levelDeep"+levelDeep+":"+result);
+       return result;
+   }
+
+   private static long getHiveTableMaxModificationTs2(Path[] paths, int 
levelDeep)throws FileNotFoundException, IOException
+   {
+       long result = 0;
+       PathFilter filter = new PathFilter(){
+           public boolean accept(Path file){
+             return !file.getName().startsWith(".");//filter out hidden files 
and directories
+           }
+       };
+       FileStatus[] fileStatuss=null;
+       if (levelDeep == 1){ // stop condition on recursive function
+           //check parent level (important for deletes):
+           for (Path path : paths){
+               FileStatus r = getFileSystem().getFileStatus(path);// super 
fast API, return in .2ms
+               if (r != null && r.getModificationTime()>result)
+                   result = r.getModificationTime();
+           }
+           if (paths.length==1)
+               fileStatuss = getFileSystem().listStatus(paths[0],filter);// 
minor optimization. avoid using list based API when not needed
+           else
+               fileStatuss = getFileSystem().listStatus(paths,filter);
+           for(int i=0;i<fileStatuss.length;i++)
+               if (fileStatuss[i].isDirectory() && 
fileStatuss[i].getModificationTime()>result)
+                   result = fileStatuss[i].getModificationTime();
+       }else{//here levelDeep >1
+           List<Path> pathList = new ArrayList<Path>();
+           if (paths.length==1)
+               fileStatuss = getFileSystem().listStatus(paths[0],filter);// 
minor optimization. avoid using list based API when not needed
+           else
+               fileStatuss = getFileSystem().listStatus(paths,filter);
+           for(int i=0;i<fileStatuss.length;i++)
+               if (fileStatuss[i].isDirectory())
+               {
+                   pathList.add(fileStatuss[i].getPath());
+                   if (fileStatuss[i].getModificationTime()>result)
+                       result = fileStatuss[i].getModificationTime();// make 
sure level n-1 is accounted for for delete partition case
+               }
+           long l = getHiveTableMaxModificationTs2(pathList.toArray(new 
Path[pathList.size()]),levelDeep-1);
+           if (l>result) result = l;
+
+       }
+     return result;
+   }
+
+   public static String getFsDefaultName()
+   {
+      String uri = config_.get("fs.defaultFS");
+      return uri;
+   }
+
+
+   public static boolean hdfsCreateDirectory(String pathStr) throws IOException
+   {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsCreateDirectory()" + pathStr);
+      Path dirPath = new Path(pathStr );
+      FileSystem fs = FileSystem.get(dirPath.toUri(), config_);
+      fs.mkdirs(dirPath);
+      return true;
+   }
+
    private native int sendFileStatus(long jniObj, int numFiles, int fileNo, 
boolean isDir, 
                         String filename, long modTime, long len,
                         short numReplicas, long blockSize, String owner, 
String group,

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java 
b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index e216555..99f021d 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -21,17 +21,6 @@
 
 package org.trafodion.sql;
 
-// This class implements an efficient mechanism to read hdfs files
-// Trafodion ExHdfsScan operator provides a range of scans to be performed.
-// The range consists of a hdfs filename, offset and length to be read
-// This class takes in two ByteBuffers. These ByteBuffer can be either direct 
buffers
-// backed up native buffers or indirect buffer backed by java arrays.
-// All the ranges are read alternating between the two buffers using 
ExecutorService
-// using CachedThreadPool mechanism. 
-// For a given HdfsScan instance, only one thread(IO thread) is scheduled to 
read
-// the next full or partial buffer while the main thread processes the 
previously
-// read information from the other buffer
-
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.Logger;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +43,24 @@ import 
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.fs.FileStatus;
 import java.net.URI;
 
+// This class implements an efficient mechanism to read hdfs files
+// Trafodion ExHdfsScan operator provides a range of scans to be performed.
+// The range consists of a hdfs filename, offset and length to be read
+// This class takes in two ByteBuffers. These ByteBuffer can be either direct 
buffers
+// backed up native buffers or indirect buffer backed by java arrays.
+// All the ranges are read alternating between the two buffers using 
ExecutorService
+// using CachedThreadPool mechanism.
+
+// For a given HdfsScan instance, only one thread(IO thread) is scheduled to 
read
+// the next full or partial buffer while the main thread processes the 
previously
+// read information from the other buffer
+// HdfsScan picks up a range and schedules a read into a next available buffer.
+// If the range is more than the buffer size, then the range is read into 
multiple
+// chunks and schedules one chunk at a time alternatiing the buffers
+// Once the range is completed, the next range in the HdfsScanRange is picked 
up
+// for scheduling, till all the ranges assigned to the HdfsScan instance are 
read.
+
+
 public class HdfsScan 
 {
    static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
@@ -61,10 +68,13 @@ public class HdfsScan
    private int bufLen_[];
    private HDFSClient hdfsClient_[];
    private int currRange_;
-   private long currPos_;
-   private long lenRemain_;
+   private long currRangePos_;
+   private long currRangeLenRemain_;
    private int lastBufCompleted_ = -1;
    private boolean scanCompleted_;
+ 
+   // Structure to hold the Scan ranges for this HdfsScan instance
+   //
    
    class HdfsScanRange 
    {
@@ -95,6 +105,7 @@ public class HdfsScan
 
    public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String 
filename[], long pos[], long len[], int rangeNum[]) throws IOException
    {
+      // Two buffers to hold the data read
       buf_ = new ByteBuffer[2];
       bufLen_ = new int[2];
 
@@ -114,39 +125,47 @@ public class HdfsScan
       }
       if (hdfsScanRanges_.length > 0) {
          currRange_ = 0;
-         currPos_ = hdfsScanRanges_[currRange_].pos_;
-         lenRemain_ = hdfsScanRanges_[currRange_].len_; 
-         hdfsScanRange(0, 0);
+         currRangePos_ = hdfsScanRanges_[currRange_].pos_;
+         currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; 
+         scheduleHdfsScanRange(0, 0);
       }
       scanCompleted_ = false;
    }
 
-   public void hdfsScanRange(int bufNo, int bytesCompleted) throws IOException
+   public void scheduleHdfsScanRange(int bufNo, int bytesCompleted) throws 
IOException
    {
-      lenRemain_ -= bytesCompleted;
-      currPos_ += bytesCompleted; 
+      currRangeLenRemain_ -= bytesCompleted;
+      currRangePos_ += bytesCompleted; 
       int readLength;
-      if (lenRemain_ <= 0) {
+      if (currRangeLenRemain_ <= 0) {
          if (currRange_  == (hdfsScanRanges_.length-1)) {
             scanCompleted_ = true;
             return;
          }
          else {
             currRange_++;
-            currPos_ = hdfsScanRanges_[currRange_].pos_;
-            lenRemain_ = hdfsScanRanges_[currRange_].len_; 
+            currRangePos_ = hdfsScanRanges_[currRange_].pos_;
+            currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; 
          }
       } 
-      if (lenRemain_ > bufLen_[bufNo])
+      if (currRangeLenRemain_ > bufLen_[bufNo])
          readLength = bufLen_[bufNo];
       else
-         readLength = (int)lenRemain_;
+         readLength = (int)currRangeLenRemain_;
       if (! scanCompleted_) {
          if (logger_.isDebugEnabled())
-            logger_.debug(" CurrentRange " + 
hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + lenRemain_ + " BufNo 
" + bufNo); 
-         hdfsClient_[bufNo] = new HDFSClient(bufNo, 
hdfsScanRanges_[currRange_].tdbRangeNum_, 
hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
+            logger_.debug(" CurrentRange " + 
hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + currRangeLenRemain_ 
+ " BufNo " + bufNo); 
+         hdfsClient_[bufNo] = new HDFSClient(bufNo, 
hdfsScanRanges_[currRange_].tdbRangeNum_, 
hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength);
       }
    } 
+  
+/* 
+   Method to wait for completion of the scheduled read of a chunk in a range
+   Returns 4 items, bytes read, buf no, range no, is EOF
+   If there are more chunks to be read in the range, schedules a read into the 
other buffer
+   If EOF is reached or the full range is read, the next range is picked up 
for 
+   scheduling
+*/
    
    public int[] trafHdfsRead() throws IOException, InterruptedException, 
ExecutionException
    {
@@ -164,12 +183,14 @@ public class HdfsScan
       switch (lastBufCompleted_) {
          case -1:
          case 1:
+            // Wait for the read to complete in buffer 0
             bytesRead = hdfsClient_[0].trafHdfsReadBuffer(); 
             bufNo = 0;
             rangeNo = hdfsClient_[0].getRangeNo();
             isEOF = hdfsClient_[0].isEOF();
             break;
          case 0:
+            // Wait for the read to complete in buffer 1
             bytesRead = hdfsClient_[1].trafHdfsReadBuffer(); 
             bufNo = 1;
             rangeNo = hdfsClient_[1].getRangeNo();
@@ -194,18 +215,20 @@ public class HdfsScan
             return retArray;
          } else {
             currRange_++;
-            currPos_ = hdfsScanRanges_[currRange_].pos_;
-            lenRemain_ = hdfsScanRanges_[currRange_].len_;
+            currRangePos_ = hdfsScanRanges_[currRange_].pos_;
+            currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_;
             bytesRead = 0;
          }
       }
       switch (lastBufCompleted_)
       {
          case 0:
-            hdfsScanRange(1, bytesRead);
+            // schedule the next chunk or next range to be read in buffer 1
+            scheduleHdfsScanRange(1, bytesRead);
             break;
          case 1:
-            hdfsScanRange(0, bytesRead);
+            // schedule the next chunk or next range to be read in buffer 0
+            scheduleHdfsScanRange(0, bytesRead);
             break;            
          default:
             break;
@@ -213,10 +236,22 @@ public class HdfsScan
       return retArray;
    } 
    
+   public void stop() throws IOException
+   {
+      if (hdfsClient_[0] != null)
+         hdfsClient_[0].stop();
+      if (hdfsClient_[1] != null)
+         hdfsClient_[1].stop();
+      hdfsClient_[0] = null;
+      hdfsClient_[1] = null;
+      return;
+   }
+
    public static void shutdown() throws InterruptedException
    {
       HDFSClient.shutdown();
    }
+
    public static void main(String[] args) throws Exception
    {
 

Reply via email to