[
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360302#comment-16360302
]
ASF GitHub Bot commented on TRAFODION-2917:
-------------------------------------------
Github user sureshsubbiah commented on a diff in the pull request:
https://github.com/apache/trafodion/pull/1417#discussion_r167467533
--- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
@@ -0,0 +1,289 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+// This class implements an efficient mechanism to read hdfs files
+// Trafodion ExHdfsScan operator provides a range of scans to be performed.
+// The range consists of a hdfs filename, offset and length to be read
+// This class takes in two ByteBuffers. These ByteBuffer can be either
direct buffers
+// backed up native buffers or indirect buffer backed by java arrays.
+// All the ranges are read alternating between the two buffers using
ExecutorService
+// using CachedThreadPool mechanism.
+// For a given HdfsScan instance, only one thread(IO thread) is scheduled
to read
+// the next full or partial buffer while the main thread processes the
previously
+// read information from the other buffer
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import org.trafodion.sql.HDFSClient;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.fs.FileStatus;
+import java.net.URI;
+
+public class HdfsScan
+{
+ static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
+ private ByteBuffer buf_[];
+ private int bufLen_[];
+ private HDFSClient hdfsClient_[];
+ private int currRange_;
+ private long currPos_;
+ private long lenRemain_;
+ private int lastBufCompleted_ = -1;
+ private boolean scanCompleted_;
+
+ class HdfsScanRange
+ {
+ String filename_;
+ long pos_;
+ long len_;
+ int tdbRangeNum_;
+
+ HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
+ {
+ filename_ = filename;
+ pos_ = pos;
+ len_ = len;
+ tdbRangeNum_ = tdbRangeNum;
+ }
+ }
+
+ private HdfsScanRange hdfsScanRanges_[];
+
+ static {
+ String confFile = System.getProperty("trafodion.log4j.configFile");
+ System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+ }
+
+ public HdfsScan()
+ {
+ }
+
+ public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String
filename[], long pos[], long len[], int rangeNum[]) throws IOException
+ {
+ buf_ = new ByteBuffer[2];
+ bufLen_ = new int[2];
+
+ buf_[0] = buf1;
+ buf_[1] = buf2;
+
+ for (int i = 0; i < 2 ; i++) {
+ if (buf_[i].hasArray())
+ bufLen_[i] = buf_[i].array().length;
+ else
+ bufLen_[i] = buf_[i].capacity();
+ }
+ hdfsClient_ = new HDFSClient[2];
+ hdfsScanRanges_ = new HdfsScanRange[filename.length];
+ for (int i = 0; i < filename.length; i++) {
+ hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i],
len[i], rangeNum[i]);
--- End diff --
An interesting will be to create ranges, (i.e. files) with 0 bytes. We know
that sqoop sometimes creates such files.
> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
> Issue Type: New Feature
> Components: sql-general
> Reporter: Selvaganesan Govindarajan
> Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files. TCB
> creates two threads to read these ranges.Two ranges (for the TCB) are
> initially assigned to these threads. As and when a range is completed, the
> next range (assigned for the TCB) is picked up by the thread. Ranges are read
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into
> up queue. If the range contains a record split, then the range is extended to
> read up to range tail IO size to get the full row. The range that had the
> latter part of the row ignores it because the former range processes it.
> Record split at the file level is not possible and/or not supported.
> For compression, the compiler returns the range info such that the hdfs scan
> buffer can hold the full uncompressed buffer.
> Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
> - Reduced scan buffer size for smoother data flow
> - Better thread utilization
> - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when
> one range is completed fully before the data from next range is processed.
> Following are the hdfsCalls used by programs at exp and executor directory.
> U hdfsCloseFile
> U hdfsConnect
> U hdfsDelete
> U hdfsExists
> U hdfsFlush
> U hdfsFreeFileInfo
> U hdfsGetPathInfo
> U hdfsListDirectory
> U hdfsOpenFile
> U hdfsPread
> U hdfsRename
> U hdfsWrite
> U hdfsCreateDirectory
> New implementation
> Make changes to use direct Java APIs for these calls. However, come up with
> better mechanism to move the data from Java and JNI, avoid unnecessary
> copying of data, better thread management via Executor concepts in Java.
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead,
> use the abstraction like what is being done for HBase access.
> I believe newer implementation will be optimized better and hence improved
> performance. (but not many folds)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)