[ https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342288#comment-16342288 ]
ASF GitHub Bot commented on TRAFODION-2917: ------------------------------------------- Github user sureshsubbiah commented on a diff in the pull request: https://github.com/apache/trafodion/pull/1417#discussion_r164280041 --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java --- @@ -0,0 +1,319 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + +package org.trafodion.sql; + +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.Logger; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.util.ReflectionUtils; + +public class HDFSClient +{ + static Logger logger_ = Logger.getLogger(HDFSClient.class.getName()); + private static Configuration config_ = null; + private static ExecutorService executorService_ = null; + private static FileSystem defaultFs_ = null; + private FileSystem fs_ = null; + private int bufNo_; + private FSDataInputStream fsdis_; + private OutputStream outStream_; + private String filename_; + private ByteBuffer buf_; + private int bufLen_; + private int bufOffset_ = 0; + private long pos_ = 0; + private int len_ = 0; + private int lenRemain_ = 0; + private int blockSize_; + private int bytesRead_; + private Future future_ = null; + + static { + String confFile = System.getProperty("trafodion.log4j.configFile"); + System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); + if (confFile == null) { + confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config"; + } + PropertyConfigurator.configure(confFile); + config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF); + executorService_ = Executors.newCachedThreadPool(); + try { + defaultFs_ = FileSystem.get(config_); + } + catch (IOException ioe) { + throw new RuntimeException("Exception in HDFSClient static block", ioe); + } + } + + class HDFSRead implements Callable + { + int length_; + + HDFSRead(int length) + { + length_ = length; + } + + public Object call() throws IOException + { + int bytesRead; + if (buf_.hasArray()) + bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_); + else + { + buf_.limit(bufOffset_ + length_); + bytesRead = fsdis_.read(buf_); + } + return new Integer(bytesRead); + } + } + + public HDFSClient() + { + } + + public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length) throws IOException + { + bufNo_ = bufNo; + filename_ = filename; + Path filepath = new Path(filename_); + fs_ = FileSystem.get(filepath.toUri(),config_); + fsdis_ = fs_.open(filepath); + blockSize_ = (int)fs_.getDefaultBlockSize(filepath); + buf_ = buffer; + bufOffset_ = 0; + pos_ = position; + len_ = length; + if (buffer.hasArray()) + bufLen_ = buffer.array().length; + else + { + bufLen_ = buffer.capacity(); + buf_.position(0); + } + lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_; + if (lenRemain_ != 0) + { + int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_; + future_ = executorService_.submit(new HDFSRead(readLength)); + } + } + + public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException + { + Integer retObject = 0; + int bytesRead; + int readLength; + + if (lenRemain_ == 0) + return 0; + retObject = (Integer)future_.get(); + bytesRead = retObject.intValue(); + if (bytesRead == -1) + return -1; + bufOffset_ += bytesRead; + pos_ += bytesRead; + lenRemain_ -= bytesRead; + if (bufOffset_ == bufLen_) + return bytesRead; + else if (bufOffset_ > bufLen_) + throw new IOException("Internal Error in trafHdfsRead "); + if (lenRemain_ == 0) + return bytesRead; + readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_; + future_ = executorService_.submit(new HDFSRead(readLength)); + return bytesRead; + } + + public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException + { + int bytesRead; + int totalBytesRead = 0; + while (true) { + bytesRead = trafHdfsRead(); + if (bytesRead == -1 || bytesRead == 0) + return totalBytesRead; + totalBytesRead += bytesRead; + if (totalBytesRead == bufLen_) + return totalBytesRead; + } + } + + boolean hdfsCreate(String fname , boolean compress) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCreate() - started" ); + Path filePath = null; + if (!compress || (compress && fname.endsWith(".gz"))) + filePath = new Path(fname); + else + filePath = new Path(fname + ".gz"); + + FileSystem fs = FileSystem.get(filePath.toUri(),config_); + FSDataOutputStream fsOut = fs.create(filePath, true); + + if (compress) { + GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); + Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); + outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); + } + else + outStream_ = fsOut; + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" ); + return true; + } + + boolean hdfsWrite(byte[] buff, long len) throws IOException + { + + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsWrite() - started" ); + outStream_.write(buff); + outStream_.flush(); + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - bytes written and flushed:" + len ); + return true; + } + + boolean hdfsClose() throws IOException + { + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" ); + if (outStream_ != null) { + outStream_.close(); + outStream_ = null; + } + return true; + } + + + public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException + { + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start"); + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - source Path: " + srcPathStr + + ", destination File:" + dstPathStr ); + Path srcPath = new Path(srcPathStr ); + srcPath = srcPath.makeQualified(srcPath.toUri(), null); + FileSystem srcFs = FileSystem.get(srcPath.toUri(),config_); + + Path dstPath = new Path(dstPathStr); + dstPath = dstPath.makeQualified(dstPath.toUri(), null); + FileSystem dstFs = FileSystem.get(dstPath.toUri(),config_); + + if (dstFs.exists(dstPath)) + { + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - destination files exists" ); + // for this prototype we just delete the file-- will change in next code drops + dstFs.delete(dstPath, false); --- End diff -- Maybe this, and the following two lines of comments should just be removed? Are we supposed to silently delete existing file and overwrite with new merged file? > Refactor Trafodion implementation of hdfs scan for text formatted hive tables > ----------------------------------------------------------------------------- > > Key: TRAFODION-2917 > URL: https://issues.apache.org/jira/browse/TRAFODION-2917 > Project: Apache Trafodion > Issue Type: New Feature > Components: sql-general > Reporter: Selvaganesan Govindarajan > Priority: Major > Fix For: 2.3 > > > Find below the general outline of hdfs scan for text formatted hive tables. > Compiler returns a list of scan ranges and the begin range and number of > ranges to be done by each instance of TCB in TDB. This list of scan ranges is > also re-computed at run time possibly based on a CQD > The scan range for a TCB can come from the same or different hdfs files. TCB > creates two threads to read these ranges.Two ranges (for the TCB) are > initially assigned to these threads. As and when a range is completed, the > next range (assigned for the TCB) is picked up by the thread. Ranges are read > in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan > buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into > up queue. If the range contains a record split, then the range is extended to > read up to range tail IO size to get the full row. The range that had the > latter part of the row ignores it because the former range processes it. > Record split at the file level is not possible and/or not supported. > For compression, the compiler returns the range info such that the hdfs scan > buffer can hold the full uncompressed buffer. > Cons: > Reader threads feature too complex to maintain in C++ > Error handling at the layer below the TCB is missing or errors are not > propagated to work method causing incorrect results > Possible multiple copying of data > Libhdfs calls are not optimized. It was observed that the method Ids are > being obtained many times. Need to check if this problem still exists. > Now that we clearly know what is expected, it could be optimized better > - Reduced scan buffer size for smoother data flow > - Better thread utilization > - Avoid multiple copying of data. > Unable to comprehend the need for two threads for pre-fetch especially when > one range is completed fully before the data from next range is processed. > Following are the hdfsCalls used by programs at exp and executor directory. > U hdfsCloseFile > U hdfsConnect > U hdfsDelete > U hdfsExists > U hdfsFlush > U hdfsFreeFileInfo > U hdfsGetPathInfo > U hdfsListDirectory > U hdfsOpenFile > U hdfsPread > U hdfsRename > U hdfsWrite > U hdfsCreateDirectory > New implementation > Make changes to use direct Java APIs for these calls. However, come up with > better mechanism to move the data from Java and JNI, avoid unnecessary > copying of data, better thread management via Executor concepts in Java. > Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, > use the abstraction like what is being done for HBase access. > I believe newer implementation will be optimized better and hence improved > performance. (but not many folds) -- This message was sent by Atlassian JIRA (v7.6.3#76005)