Github user sureshsubbiah commented on a diff in the pull request:
https://github.com/apache/trafodion/pull/1417#discussion_r164280283
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient
+{
+ static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+ private static Configuration config_ = null;
+ private static ExecutorService executorService_ = null;
+ private static FileSystem defaultFs_ = null;
+ private FileSystem fs_ = null;
+ private int bufNo_;
+ private FSDataInputStream fsdis_;
+ private OutputStream outStream_;
+ private String filename_;
+ private ByteBuffer buf_;
+ private int bufLen_;
+ private int bufOffset_ = 0;
+ private long pos_ = 0;
+ private int len_ = 0;
+ private int lenRemain_ = 0;
+ private int blockSize_;
+ private int bytesRead_;
+ private Future future_ = null;
+
+ static {
+ String confFile = System.getProperty("trafodion.log4j.configFile");
+ System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+ if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+ }
+ PropertyConfigurator.configure(confFile);
+ config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+ executorService_ = Executors.newCachedThreadPool();
+ try {
+ defaultFs_ = FileSystem.get(config_);
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static
block", ioe);
+ }
+ }
+
+ class HDFSRead implements Callable
+ {
+ int length_;
+
+ HDFSRead(int length)
+ {
+ length_ = length;
+ }
+
+ public Object call() throws IOException
+ {
+ int bytesRead;
+ if (buf_.hasArray())
+ bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_,
length_);
+ else
+ {
+ buf_.limit(bufOffset_ + length_);
+ bytesRead = fsdis_.read(buf_);
+ }
+ return new Integer(bytesRead);
+ }
+ }
+
+ public HDFSClient()
+ {
+ }
+
+ public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long
position, int length) throws IOException
+ {
+ bufNo_ = bufNo;
+ filename_ = filename;
+ Path filepath = new Path(filename_);
+ fs_ = FileSystem.get(filepath.toUri(),config_);
+ fsdis_ = fs_.open(filepath);
+ blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
+ buf_ = buffer;
+ bufOffset_ = 0;
+ pos_ = position;
+ len_ = length;
+ if (buffer.hasArray())
+ bufLen_ = buffer.array().length;
+ else
+ {
+ bufLen_ = buffer.capacity();
+ buf_.position(0);
+ }
+ lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
+ if (lenRemain_ != 0)
+ {
+ int readLength = (lenRemain_ > blockSize_) ? blockSize_ :
lenRemain_;
+ future_ = executorService_.submit(new HDFSRead(readLength));
+ }
+ }
+
+ public int trafHdfsRead() throws IOException, InterruptedException,
ExecutionException
+ {
+ Integer retObject = 0;
+ int bytesRead;
+ int readLength;
+
+ if (lenRemain_ == 0)
+ return 0;
+ retObject = (Integer)future_.get();
+ bytesRead = retObject.intValue();
+ if (bytesRead == -1)
+ return -1;
+ bufOffset_ += bytesRead;
+ pos_ += bytesRead;
+ lenRemain_ -= bytesRead;
+ if (bufOffset_ == bufLen_)
+ return bytesRead;
+ else if (bufOffset_ > bufLen_)
+ throw new IOException("Internal Error in trafHdfsRead ");
+ if (lenRemain_ == 0)
+ return bytesRead;
+ readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
+ future_ = executorService_.submit(new HDFSRead(readLength));
+ return bytesRead;
+ }
+
+ public int trafHdfsReadBuffer() throws IOException,
InterruptedException, ExecutionException
+ {
+ int bytesRead;
+ int totalBytesRead = 0;
+ while (true) {
+ bytesRead = trafHdfsRead();
+ if (bytesRead == -1 || bytesRead == 0)
+ return totalBytesRead;
+ totalBytesRead += bytesRead;
+ if (totalBytesRead == bufLen_)
+ return totalBytesRead;
+ }
+ }
+
+ boolean hdfsCreate(String fname , boolean compress) throws IOException
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("HDFSClient.hdfsCreate() - started" );
+ Path filePath = null;
+ if (!compress || (compress && fname.endsWith(".gz")))
+ filePath = new Path(fname);
+ else
+ filePath = new Path(fname + ".gz");
+
+ FileSystem fs = FileSystem.get(filePath.toUri(),config_);
+ FSDataOutputStream fsOut = fs.create(filePath, true);
+
+ if (compress) {
--- End diff --
Since adding a new compression type takes only a few lines of code, it will
be good to look into increasing the types of compressed unload files we can
write to. I can file a JIRA if there is interest. It is unrelated to focus of
this change.
---