Github user sureshsubbiah commented on a diff in the pull request: https://github.com/apache/trafodion/pull/1417#discussion_r167467533 --- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java --- @@ -0,0 +1,289 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + +package org.trafodion.sql; + +// This class implements an efficient mechanism to read hdfs files +// Trafodion ExHdfsScan operator provides a range of scans to be performed. +// The range consists of a hdfs filename, offset and length to be read +// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers +// backed up native buffers or indirect buffer backed by java arrays. +// All the ranges are read alternating between the two buffers using ExecutorService +// using CachedThreadPool mechanism. +// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read +// the next full or partial buffer while the main thread processes the previously +// read information from the other buffer + +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.Logger; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import org.trafodion.sql.HDFSClient; + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.fs.FileStatus; +import java.net.URI; + +public class HdfsScan +{ + static Logger logger_ = Logger.getLogger(HdfsScan.class.getName()); + private ByteBuffer buf_[]; + private int bufLen_[]; + private HDFSClient hdfsClient_[]; + private int currRange_; + private long currPos_; + private long lenRemain_; + private int lastBufCompleted_ = -1; + private boolean scanCompleted_; + + class HdfsScanRange + { + String filename_; + long pos_; + long len_; + int tdbRangeNum_; + + HdfsScanRange(String filename, long pos, long len, int tdbRangeNum) + { + filename_ = filename; + pos_ = pos; + len_ = len; + tdbRangeNum_ = tdbRangeNum; + } + } + + private HdfsScanRange hdfsScanRanges_[]; + + static { + String confFile = System.getProperty("trafodion.log4j.configFile"); + System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); + } + + public HdfsScan() + { + } + + public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException + { + buf_ = new ByteBuffer[2]; + bufLen_ = new int[2]; + + buf_[0] = buf1; + buf_[1] = buf2; + + for (int i = 0; i < 2 ; i++) { + if (buf_[i].hasArray()) + bufLen_[i] = buf_[i].array().length; + else + bufLen_[i] = buf_[i].capacity(); + } + hdfsClient_ = new HDFSClient[2]; + hdfsScanRanges_ = new HdfsScanRange[filename.length]; + for (int i = 0; i < filename.length; i++) { + hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]); --- End diff -- An interesting will be to create ranges, (i.e. files) with 0 bytes. We know that sqoop sometimes creates such files.
---