[ https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360349#comment-16360349 ]
ASF GitHub Bot commented on TRAFODION-2917: ------------------------------------------- Github user selvaganesang commented on a diff in the pull request: https://github.com/apache/trafodion/pull/1417#discussion_r167474575 --- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java --- @@ -0,0 +1,289 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + +package org.trafodion.sql; + +// This class implements an efficient mechanism to read hdfs files +// Trafodion ExHdfsScan operator provides a range of scans to be performed. +// The range consists of a hdfs filename, offset and length to be read +// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers +// backed up native buffers or indirect buffer backed by java arrays. +// All the ranges are read alternating between the two buffers using ExecutorService +// using CachedThreadPool mechanism. +// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read +// the next full or partial buffer while the main thread processes the previously +// read information from the other buffer + +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.Logger; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import org.trafodion.sql.HDFSClient; + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.fs.FileStatus; +import java.net.URI; + +public class HdfsScan +{ + static Logger logger_ = Logger.getLogger(HdfsScan.class.getName()); + private ByteBuffer buf_[]; + private int bufLen_[]; + private HDFSClient hdfsClient_[]; + private int currRange_; + private long currPos_; + private long lenRemain_; + private int lastBufCompleted_ = -1; + private boolean scanCompleted_; + + class HdfsScanRange + { + String filename_; + long pos_; + long len_; + int tdbRangeNum_; + + HdfsScanRange(String filename, long pos, long len, int tdbRangeNum) + { + filename_ = filename; + pos_ = pos; + len_ = len; + tdbRangeNum_ = tdbRangeNum; + } + } + + private HdfsScanRange hdfsScanRanges_[]; + + static { + String confFile = System.getProperty("trafodion.log4j.configFile"); + System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); + } + + public HdfsScan() + { + } + + public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException + { + buf_ = new ByteBuffer[2]; + bufLen_ = new int[2]; + + buf_[0] = buf1; + buf_[1] = buf2; + + for (int i = 0; i < 2 ; i++) { + if (buf_[i].hasArray()) + bufLen_[i] = buf_[i].array().length; + else + bufLen_[i] = buf_[i].capacity(); + } + hdfsClient_ = new HDFSClient[2]; + hdfsScanRanges_ = new HdfsScanRange[filename.length]; + for (int i = 0; i < filename.length; i++) { + hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]); --- End diff -- I think it should work. Let me confirm it > Refactor Trafodion implementation of hdfs scan for text formatted hive tables > ----------------------------------------------------------------------------- > > Key: TRAFODION-2917 > URL: https://issues.apache.org/jira/browse/TRAFODION-2917 > Project: Apache Trafodion > Issue Type: New Feature > Components: sql-general > Reporter: Selvaganesan Govindarajan > Priority: Major > Fix For: 2.3 > > > Find below the general outline of hdfs scan for text formatted hive tables. > Compiler returns a list of scan ranges and the begin range and number of > ranges to be done by each instance of TCB in TDB. This list of scan ranges is > also re-computed at run time possibly based on a CQD > The scan range for a TCB can come from the same or different hdfs files. TCB > creates two threads to read these ranges.Two ranges (for the TCB) are > initially assigned to these threads. As and when a range is completed, the > next range (assigned for the TCB) is picked up by the thread. Ranges are read > in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan > buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into > up queue. If the range contains a record split, then the range is extended to > read up to range tail IO size to get the full row. The range that had the > latter part of the row ignores it because the former range processes it. > Record split at the file level is not possible and/or not supported. > For compression, the compiler returns the range info such that the hdfs scan > buffer can hold the full uncompressed buffer. > Cons: > Reader threads feature too complex to maintain in C++ > Error handling at the layer below the TCB is missing or errors are not > propagated to work method causing incorrect results > Possible multiple copying of data > Libhdfs calls are not optimized. It was observed that the method Ids are > being obtained many times. Need to check if this problem still exists. > Now that we clearly know what is expected, it could be optimized better > - Reduced scan buffer size for smoother data flow > - Better thread utilization > - Avoid multiple copying of data. > Unable to comprehend the need for two threads for pre-fetch especially when > one range is completed fully before the data from next range is processed. > Following are the hdfsCalls used by programs at exp and executor directory. > U hdfsCloseFile > U hdfsConnect > U hdfsDelete > U hdfsExists > U hdfsFlush > U hdfsFreeFileInfo > U hdfsGetPathInfo > U hdfsListDirectory > U hdfsOpenFile > U hdfsPread > U hdfsRename > U hdfsWrite > U hdfsCreateDirectory > New implementation > Make changes to use direct Java APIs for these calls. However, come up with > better mechanism to move the data from Java and JNI, avoid unnecessary > copying of data, better thread management via Executor concepts in Java. > Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, > use the abstraction like what is being done for HBase access. > I believe newer implementation will be optimized better and hence improved > performance. (but not many folds) -- This message was sent by Atlassian JIRA (v7.6.3#76005)