[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342288#comment-16342288
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
-------------------------------------------

Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164280041
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      int length_;
    +
    +      HDFSRead(int length) 
    +      {
    +         length_ = length;
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         if (buf_.hasArray())
    +            bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, 
length_);
    +         else
    +         {
    +            buf_.limit(bufOffset_ + length_);
    +            bytesRead = fsdis_.read(buf_);
    +         }
    +         return new Integer(bytesRead);
    +      }
    +   }
    +       
    +   public HDFSClient() 
    +   {
    +   }
    + 
    +   public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long 
position, int length) throws IOException
    +   {
    +      bufNo_ = bufNo; 
    +      filename_ = filename;
    +      Path filepath = new Path(filename_);
    +      fs_ = FileSystem.get(filepath.toUri(),config_);
    +      fsdis_ = fs_.open(filepath);
    +      blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
    +      buf_  = buffer;
    +      bufOffset_ = 0;
    +      pos_ = position;
    +      len_ = length;
    +      if (buffer.hasArray()) 
    +         bufLen_ = buffer.array().length;
    +      else
    +      {
    +         bufLen_ = buffer.capacity();
    +         buf_.position(0);
    +      }
    +      lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
    +      if (lenRemain_ != 0)
    +      {
    +         int readLength = (lenRemain_ > blockSize_) ? blockSize_ : 
lenRemain_;
    +         future_ = executorService_.submit(new HDFSRead(readLength));
    +      }
    +   }
    +
    +   public int trafHdfsRead() throws IOException, InterruptedException, 
ExecutionException
    +   {
    +      Integer retObject = 0;
    +      int bytesRead;
    +      int readLength;
    +       
    +      if (lenRemain_ == 0)
    +         return 0;
    +      retObject = (Integer)future_.get();
    +      bytesRead = retObject.intValue();
    +      if (bytesRead == -1)
    +         return -1;
    +      bufOffset_ += bytesRead;
    +      pos_ += bytesRead;
    +      lenRemain_ -= bytesRead;
    +      if (bufOffset_ == bufLen_)
    +         return bytesRead; 
    +      else if (bufOffset_ > bufLen_)
    +         throw new IOException("Internal Error in trafHdfsRead ");
    +      if (lenRemain_ == 0)
    +         return bytesRead; 
    +      readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
    +      future_ = executorService_.submit(new HDFSRead(readLength));
    +      return bytesRead;
    +   } 
    +
    +   public int trafHdfsReadBuffer() throws IOException, 
InterruptedException, ExecutionException
    +   {
    +      int bytesRead;
    +      int totalBytesRead = 0;
    +      while (true) {
    +         bytesRead = trafHdfsRead();
    +         if (bytesRead == -1 || bytesRead == 0)
    +            return totalBytesRead;
    +         totalBytesRead += bytesRead;
    +         if (totalBytesRead == bufLen_)
    +              return totalBytesRead;
    +      }  
    +   } 
    +
    +   boolean hdfsCreate(String fname , boolean compress) throws IOException
    +   {
    +     if (logger_.isDebugEnabled()) 
    +        logger_.debug("HDFSClient.hdfsCreate() - started" );
    +      Path filePath = null;
    +      if (!compress || (compress && fname.endsWith(".gz")))
    +        filePath = new Path(fname);
    +      else
    +        filePath = new Path(fname + ".gz");
    +        
    +      FileSystem fs = FileSystem.get(filePath.toUri(),config_);
    +      FSDataOutputStream fsOut = fs.create(filePath, true);
    +      
    +      if (compress) {
    +        GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( 
GzipCodec.class, config_);
    +        Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
    +        outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
    +      }
    +      else
    +        outStream_ = fsOut;      
    +      if (logger_.isDebugEnabled()) 
    +         logger_.debug("HDFSClient.hdfsCreate() - compressed output stream 
created" );
    +      return true;
    +    }
    +    
    +    boolean hdfsWrite(byte[] buff, long len) throws IOException
    +    {
    +
    +      if (logger_.isDebugEnabled()) 
    +         logger_.debug("HDFSClient.hdfsWrite() - started" );
    +      outStream_.write(buff);
    +      outStream_.flush();
    +      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() 
- bytes written and flushed:" + len  );
    +      return true;
    +    }
    +    
    +    boolean hdfsClose() throws IOException
    +    {
    +      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() 
- started" );
    +      if (outStream_ != null) {
    +          outStream_.close();
    +          outStream_ = null;
    +      }
    +      return true;
    +    }
    +
    +    
    +    public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) 
throws IOException
    +    {
    +      if (logger_.isDebugEnabled()) 
logger_.debug("HDFSClient.hdfsMergeFiles() - start");
    +      if (logger_.isDebugEnabled()) 
logger_.debug("HDFSClient.hdfsMergeFiles() - source Path: " + srcPathStr + 
    +                                               ", destination File:" + 
dstPathStr );
    +        Path srcPath = new Path(srcPathStr );
    +        srcPath = srcPath.makeQualified(srcPath.toUri(), null);
    +        FileSystem srcFs = FileSystem.get(srcPath.toUri(),config_);
    +  
    +        Path dstPath = new Path(dstPathStr);
    +        dstPath = dstPath.makeQualified(dstPath.toUri(), null);
    +        FileSystem dstFs = FileSystem.get(dstPath.toUri(),config_);
    +        
    +        if (dstFs.exists(dstPath))
    +        {
    +          if (logger_.isDebugEnabled()) 
logger_.debug("HDFSClient.hdfsMergeFiles() - destination files exists" );
    +          // for this prototype we just delete the file-- will change in 
next code drops
    +          dstFs.delete(dstPath, false);
    --- End diff --
    
    Maybe this, and the following two lines of comments should just be removed? 
Are we supposed to silently delete existing file and overwrite with new merged 
file?


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
>                 Key: TRAFODION-2917
>                 URL: https://issues.apache.org/jira/browse/TRAFODION-2917
>             Project: Apache Trafodion
>          Issue Type: New Feature
>          Components: sql-general
>            Reporter: Selvaganesan Govindarajan
>            Priority: Major
>             Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>                   U hdfsCloseFile
>                  U hdfsConnect
>                  U hdfsDelete
>                  U hdfsExists
>                  U hdfsFlush
>                  U hdfsFreeFileInfo
>                  U hdfsGetPathInfo
>                  U hdfsListDirectory
>                  U hdfsOpenFile
>                  U hdfsPread
>                  U hdfsRename
>                  U hdfsWrite
>                  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to