[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360349#comment-16360349
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
-------------------------------------------

Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167474575
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
    @@ -0,0 +1,289 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +// This class implements an efficient mechanism to read hdfs files
    +// Trafodion ExHdfsScan operator provides a range of scans to be performed.
    +// The range consists of a hdfs filename, offset and length to be read
    +// This class takes in two ByteBuffers. These ByteBuffer can be either 
direct buffers
    +// backed up native buffers or indirect buffer backed by java arrays.
    +// All the ranges are read alternating between the two buffers using 
ExecutorService
    +// using CachedThreadPool mechanism. 
    +// For a given HdfsScan instance, only one thread(IO thread) is scheduled 
to read
    +// the next full or partial buffer while the main thread processes the 
previously
    +// read information from the other buffer
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import org.trafodion.sql.HDFSClient;
    +
    +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.api.Table;
    +import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
    +import org.apache.hadoop.fs.FileStatus;
    +import java.net.URI;
    +
    +public class HdfsScan 
    +{
    +   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
    +   private ByteBuffer buf_[];
    +   private int bufLen_[];
    +   private HDFSClient hdfsClient_[];
    +   private int currRange_;
    +   private long currPos_;
    +   private long lenRemain_;
    +   private int lastBufCompleted_ = -1;
    +   private boolean scanCompleted_;
    +   
    +   class HdfsScanRange 
    +   {
    +      String filename_;
    +      long pos_;
    +      long len_;
    +      int tdbRangeNum_;
    +      
    +      HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
    +      {
    +         filename_ = filename;
    +         pos_ = pos;
    +         len_ = len;
    +         tdbRangeNum_ = tdbRangeNum;
    +      }
    +   }
    +   
    +   private HdfsScanRange hdfsScanRanges_[];
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +   }
    +
    +   public HdfsScan() 
    +   {
    +   }
    +
    +   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String 
filename[], long pos[], long len[], int rangeNum[]) throws IOException
    +   {
    +      buf_ = new ByteBuffer[2];
    +      bufLen_ = new int[2];
    +
    +      buf_[0] = buf1;
    +      buf_[1] = buf2;
    +
    +      for (int i = 0; i < 2 ; i++) {
    +          if (buf_[i].hasArray())
    +             bufLen_[i] = buf_[i].array().length;
    +          else
    +             bufLen_[i] = buf_[i].capacity();
    +      }
    +      hdfsClient_ = new HDFSClient[2];
    +      hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
    +      for (int i = 0; i < filename.length; i++) {
    +         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], 
len[i], rangeNum[i]);
    --- End diff --
    
    I think it should work. Let me confirm it


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
>                 Key: TRAFODION-2917
>                 URL: https://issues.apache.org/jira/browse/TRAFODION-2917
>             Project: Apache Trafodion
>          Issue Type: New Feature
>          Components: sql-general
>            Reporter: Selvaganesan Govindarajan
>            Priority: Major
>             Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>                   U hdfsCloseFile
>                  U hdfsConnect
>                  U hdfsDelete
>                  U hdfsExists
>                  U hdfsFlush
>                  U hdfsFreeFileInfo
>                  U hdfsGetPathInfo
>                  U hdfsListDirectory
>                  U hdfsOpenFile
>                  U hdfsPread
>                  U hdfsRename
>                  U hdfsWrite
>                  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to