Repository: trafodion Updated Branches: refs/heads/master 087af70db -> de8357677
http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java new file mode 100644 index 0000000..8d2052f --- /dev/null +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -0,0 +1,319 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + +package org.trafodion.sql; + +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.Logger; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.util.ReflectionUtils; + +public class HDFSClient +{ + static Logger logger_ = Logger.getLogger(HDFSClient.class.getName()); + private static Configuration config_ = null; + private static ExecutorService executorService_ = null; + private static FileSystem defaultFs_ = null; + private FileSystem fs_ = null; + private int bufNo_; + private FSDataInputStream fsdis_; + private OutputStream outStream_; + private String filename_; + private ByteBuffer buf_; + private int bufLen_; + private int bufOffset_ = 0; + private long pos_ = 0; + private int len_ = 0; + private int lenRemain_ = 0; + private int blockSize_; + private int bytesRead_; + private Future future_ = null; + + static { + String confFile = System.getProperty("trafodion.log4j.configFile"); + System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); + if (confFile == null) { + confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config"; + } + PropertyConfigurator.configure(confFile); + config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF); + executorService_ = Executors.newCachedThreadPool(); + try { + defaultFs_ = FileSystem.get(config_); + } + catch (IOException ioe) { + throw new RuntimeException("Exception in HDFSClient static block", ioe); + } + } + + class HDFSRead implements Callable + { + int length_; + + HDFSRead(int length) + { + length_ = length; + } + + public Object call() throws IOException + { + int bytesRead; + if (buf_.hasArray()) + bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_); + else + { + buf_.limit(bufOffset_ + length_); + bytesRead = fsdis_.read(buf_); + } + return new Integer(bytesRead); + } + } + + public HDFSClient() + { + } + + public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length) throws IOException + { + bufNo_ = bufNo; + filename_ = filename; + Path filepath = new Path(filename_); + fs_ = FileSystem.get(filepath.toUri(),config_); + fsdis_ = fs_.open(filepath); + blockSize_ = (int)fs_.getDefaultBlockSize(filepath); + buf_ = buffer; + bufOffset_ = 0; + pos_ = position; + len_ = length; + if (buffer.hasArray()) + bufLen_ = buffer.array().length; + else + { + bufLen_ = buffer.capacity(); + buf_.position(0); + } + lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_; + if (lenRemain_ != 0) + { + int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_; + future_ = executorService_.submit(new HDFSRead(readLength)); + } + } + + public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException + { + Integer retObject = 0; + int bytesRead; + int readLength; + + if (lenRemain_ == 0) + return 0; + retObject = (Integer)future_.get(); + bytesRead = retObject.intValue(); + if (bytesRead == -1) + return -1; + bufOffset_ += bytesRead; + pos_ += bytesRead; + lenRemain_ -= bytesRead; + if (bufOffset_ == bufLen_) + return bytesRead; + else if (bufOffset_ > bufLen_) + throw new IOException("Internal Error in trafHdfsRead "); + if (lenRemain_ == 0) + return bytesRead; + readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_; + future_ = executorService_.submit(new HDFSRead(readLength)); + return bytesRead; + } + + public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException + { + int bytesRead; + int totalBytesRead = 0; + while (true) { + bytesRead = trafHdfsRead(); + if (bytesRead == -1 || bytesRead == 0) + return totalBytesRead; + totalBytesRead += bytesRead; + if (totalBytesRead == bufLen_) + return totalBytesRead; + } + } + + boolean hdfsCreate(String fname , boolean compress) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCreate() - started" ); + Path filePath = null; + if (!compress || (compress && fname.endsWith(".gz"))) + filePath = new Path(fname); + else + filePath = new Path(fname + ".gz"); + + FileSystem fs = FileSystem.get(filePath.toUri(),config_); + FSDataOutputStream fsOut = fs.create(filePath, true); + + if (compress) { + GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); + Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); + outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); + } + else + outStream_ = fsOut; + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" ); + return true; + } + + boolean hdfsWrite(byte[] buff, long len) throws IOException + { + + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsWrite() - started" ); + outStream_.write(buff); + outStream_.flush(); + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - bytes written and flushed:" + len ); + return true; + } + + boolean hdfsClose() throws IOException + { + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" ); + if (outStream_ != null) { + outStream_.close(); + outStream_ = null; + } + return true; + } + + + public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException + { + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start"); + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - source Path: " + srcPathStr + + ", destination File:" + dstPathStr ); + Path srcPath = new Path(srcPathStr ); + srcPath = srcPath.makeQualified(srcPath.toUri(), null); + FileSystem srcFs = FileSystem.get(srcPath.toUri(),config_); + + Path dstPath = new Path(dstPathStr); + dstPath = dstPath.makeQualified(dstPath.toUri(), null); + FileSystem dstFs = FileSystem.get(dstPath.toUri(),config_); + + if (dstFs.exists(dstPath)) + { + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - destination files exists" ); + // for this prototype we just delete the file-- will change in next code drops + dstFs.delete(dstPath, false); + // The caller should already have checked existence of file-- throw exception + //throw new FileAlreadyExistsException(dstPath.toString()); + } + + Path tmpSrcPath = new Path(srcPath, "tmp"); + + FileSystem.mkdirs(srcFs, tmpSrcPath,srcFs.getFileStatus(srcPath).getPermission()); + logger_.debug("HDFSClient.hdfsMergeFiles() - tmp folder created." ); + Path[] files = FileUtil.stat2Paths(srcFs.listStatus(srcPath)); + for (Path f : files) + { + srcFs.rename(f, tmpSrcPath); + } + // copyMerge and use false for the delete option since it removes the whole directory + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - copyMerge" ); + FileUtil.copyMerge(srcFs, tmpSrcPath, dstFs, dstPath, false, config_, null); + + if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - delete intermediate files" ); + srcFs.delete(tmpSrcPath, true); + return true; + } + + public boolean hdfsCleanUnloadPath(String uldPathStr + /*, boolean checkExistence, String mergeFileStr*/) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCleanUnloadPath() - unload Path: " + uldPathStr ); + + Path uldPath = new Path(uldPathStr ); + FileSystem fs = FileSystem.get(uldPath.toUri(), config_); + if (!fs.exists(uldPath)) + { + //unload location does not exist. hdfscreate will create it later + //nothing to do + return true; + } + + Path[] files = FileUtil.stat2Paths(fs.listStatus(uldPath)); + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCleanUnloadPath() - delete files" ); + for (Path f : files){ + fs.delete(f, false); + } + return true; + } + + public boolean hdfsExists(String filePathStr) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsExists() - Path: " + filePathStr); + + Path filePath = new Path(filePathStr ); + FileSystem fs = FileSystem.get(filePath.toUri(), config_); + if (fs.exists(filePath)) + return true; + return false; + } + + public boolean hdfsDeletePath(String pathStr) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsDeletePath() - start - Path: " + pathStr); + Path delPath = new Path(pathStr ); + FileSystem fs = FileSystem.get(delPath.toUri(), config_); + fs.delete(delPath, true); + return true; + } + + public static void shutdown() throws InterruptedException + { + executorService_.awaitTermination(100, TimeUnit.MILLISECONDS); + executorService_.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java new file mode 100644 index 0000000..bf81ab0 --- /dev/null +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -0,0 +1,248 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + +package org.trafodion.sql; + +// This class implements an efficient mechanism to read hdfs files +// Trafodion ExHdfsScan operator provides a range of scans to be performed. +// The range consists of a hdfs filename, offset and length to be read +// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers +// backed up native buffers or indirect buffer backed by java arrays. +// All the ranges are read alternating between the two buffers using ExecutorService +// using CachedThreadPool mechanism. +// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read +// the next full or partial buffer while the main thread processes the previously +// read information from the other buffer + +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.Logger; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import org.trafodion.sql.HDFSClient; + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.fs.FileStatus; +import java.net.URI; + +public class HdfsScan +{ + static Logger logger_ = Logger.getLogger(HdfsScan.class.getName()); + private ByteBuffer buf_[]; + private int bufLen_[]; + private HDFSClient hdfsClient_[]; + private int currRange_; + private long currPos_; + private long lenRemain_; + private int lastBufCompleted_ = -1; + private boolean scanCompleted_; + private boolean lastScanRangeScheduled_; + + class HdfsScanRange + { + String filename_; + long pos_; + long len_; + + HdfsScanRange(String filename, long pos, long len) + { + filename_ = filename; + pos_ = pos; + len_ = len; + } + } + + private HdfsScanRange hdfsScanRanges_[]; + + static { + String confFile = System.getProperty("trafodion.log4j.configFile"); + System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); + } + + HdfsScan(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[]) throws IOException + { + buf_ = new ByteBuffer[2]; + bufLen_ = new int[2]; + + buf_[0] = buf1; + buf_[1] = buf2; + + for (int i = 0; i < 2 ; i++) { + if (buf_[i].hasArray()) + bufLen_[i] = buf_[i].array().length; + else + bufLen_[i] = buf_[i].capacity(); + } + hdfsClient_ = new HDFSClient[2]; + hdfsScanRanges_ = new HdfsScanRange[filename.length]; + for (int i = 0; i < filename.length; i++) { + hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i]); + } + if (hdfsScanRanges_.length > 0) { + currRange_ = 0; + currPos_ = hdfsScanRanges_[0].pos_; + lenRemain_ = hdfsScanRanges_[0].len_; + hdfsScanRange(0); + } + scanCompleted_ = false; + lastScanRangeScheduled_ = false; + } + + public void hdfsScanRange(int bufNo) throws IOException + { + System.out.println (" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); + int readLength; + if (lenRemain_ > bufLen_[bufNo]) + readLength = bufLen_[bufNo]; + else + readLength = (int)lenRemain_; + hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength); + lenRemain_ -= readLength; + currPos_ += readLength; + if (lenRemain_ == 0) { + if (currRange_ == (hdfsScanRanges_.length-1)) + lastScanRangeScheduled_ = true; + else { + currRange_++; + currPos_ = hdfsScanRanges_[currRange_].pos_; + lenRemain_ = hdfsScanRanges_[currRange_].len_; + } + } + } + + public int[] trafHdfsRead() throws IOException, InterruptedException, ExecutionException + { + int[] retArray; + int byteCompleted; + int bufNo; + + if (scanCompleted_) + return null; + retArray = new int[2]; + switch (lastBufCompleted_) { + case -1: + case 1: + byteCompleted = hdfsClient_[0].trafHdfsReadBuffer(); + bufNo = 0; + break; + case 0: + byteCompleted = hdfsClient_[1].trafHdfsReadBuffer(); + bufNo = 1; + break; + default: + bufNo = -1; + byteCompleted = -1; + } + lastBufCompleted_ = bufNo; + retArray[0] = byteCompleted; + retArray[1] = bufNo; + System.out.println (" Buffer No " + retArray[1] + " Bytes Read " + retArray[0]); + lastBufCompleted_ = bufNo; + if (lastScanRangeScheduled_) { + scanCompleted_ = true; + return retArray; + } + switch (lastBufCompleted_) + { + case 0: + hdfsScanRange(1); + break; + case 1: + hdfsScanRange(0); + break; + default: + break; + } + return retArray; + } + + public static void shutdown() throws InterruptedException + { + HDFSClient.shutdown(); + } + public static void main(String[] args) throws Exception + { + + if (args.length < 3) + { + System.out.println("Usage: org.trafodion.sql.HdfsScan <tableName> <buffer_length> <number_of_splits>"); + return; + } + String tableName = args[0]; + int capacity = Integer.parseInt(args[1]) * 1024 *1024; + int split = Integer.parseInt(args[2]); + HiveConf config = new HiveConf(); + HiveMetaStoreClient hiveMeta = new HiveMetaStoreClient(config); + Table table = hiveMeta.getTable(tableName); + StorageDescriptor sd = table.getSd(); + String location = sd.getLocation(); + URI uri = new URI(location); + Path path = new Path(uri); + FileSystem fs = FileSystem.get(config); + FileStatus file_status[] = fs.listStatus(path); + ByteBuffer buf1 = ByteBuffer.allocateDirect(capacity); + ByteBuffer buf2 = ByteBuffer.allocateDirect(capacity); + String fileName[] = new String[file_status.length * split]; + long pos[] = new long[file_status.length * split]; + long len[] = new long[file_status.length * split]; + for (int i = 0 ; i < file_status.length * split; i++) { + Path filePath = file_status[i].getPath(); + long fileLen = file_status[i].getLen(); + long splitLen = fileLen / split; + fileName[i] = filePath.toString(); + System.out.println (" fileName " + fileName[i] + " Length " + fileLen); + long splitPos = 0; + for (int j = 0 ; j < split ; j++) + { + fileName[i] = filePath.toString(); + pos[i] = splitPos + (splitLen * j); + len[i] = splitLen; + if (j == (split-1)) + len[i] = fileLen - (splitLen *(j)); + System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]); + i++; + } + } + long time1 = System.currentTimeMillis(); + HdfsScan hdfsScan = new HdfsScan(buf1, buf2, fileName, pos, len); + int[] retArray; + int bytesCompleted; + while (true) { + retArray = hdfsScan.trafHdfsRead(); + if (retArray == null) + break; + } + long time2 = System.currentTimeMillis(); + HdfsScan.shutdown(); + System.out.println("Time taken in milliSeconds " + (time2-time1) ); + } +} http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java b/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java index 2007005..ff88dd7 100644 --- a/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java +++ b/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java @@ -143,158 +143,4 @@ public class SequenceFileWriter { } return null; } - - boolean hdfsCreate(String fname , boolean compress) throws IOException - { - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - started" ); - Path filePath = null; - if (!compress || (compress && fname.endsWith(".gz"))) - filePath = new Path(fname); - else - filePath = new Path(fname + ".gz"); - - fs = FileSystem.get(filePath.toUri(),conf); - fsOut = fs.create(filePath, true); - - outStream = fsOut; - - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - file created" ); - if (compress) - { - GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, conf); - Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); - outStream = gzipCodec.createOutputStream(fsOut, gzipCompressor); - sameStream = false; - } - - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - compressed output stream created" ); - return true; - } - - boolean hdfsWrite(byte[] buff, long len) throws IOException - { - - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - started" ); - outStream.write(buff); - outStream.flush(); - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - bytes written and flushed:" + len ); - return true; - } - - boolean hdfsClose() throws IOException - { - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsClose() - started" ); - if (sameStream) { - if (outStream != null) { - outStream.close(); - outStream = null; - } - fsOut = null; - } - else { - if (outStream != null) { - outStream.close(); - outStream = null; - } - if (fsOut != null) { - fsOut.close(); - fsOut = null; - } - } - return true; - } - - - public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException - { - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - start"); - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - source Path: " + srcPathStr + - ", destination File:" + dstPathStr ); - Path srcPath = new Path(srcPathStr ); - srcPath = srcPath.makeQualified(srcPath.toUri(), null); - FileSystem srcFs = FileSystem.get(srcPath.toUri(),conf); - - Path dstPath = new Path(dstPathStr); - dstPath = dstPath.makeQualified(dstPath.toUri(), null); - FileSystem dstFs = FileSystem.get(dstPath.toUri(),conf); - - if (dstFs.exists(dstPath)) - { - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - destination files exists" ); - // for this prototype we just delete the file-- will change in next code drops - dstFs.delete(dstPath, false); - // The caller should already have checked existence of file-- throw exception - //throw new FileAlreadyExistsException(dstPath.toString()); - } - - Path tmpSrcPath = new Path(srcPath, "tmp"); - - FileSystem.mkdirs(srcFs, tmpSrcPath,srcFs.getFileStatus(srcPath).getPermission()); - logger.debug("SequenceFileWriter.hdfsMergeFiles() - tmp folder created." ); - Path[] files = FileUtil.stat2Paths(srcFs.listStatus(srcPath)); - for (Path f : files) - { - srcFs.rename(f, tmpSrcPath); - } - // copyMerge and use false for the delete option since it removes the whole directory - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - copyMerge" ); - FileUtil.copyMerge(srcFs, tmpSrcPath, dstFs, dstPath, false, conf, null); - - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - delete intermediate files" ); - srcFs.delete(tmpSrcPath, true); - return true; - } - - public boolean hdfsCleanUnloadPath(String uldPathStr - /*, boolean checkExistence, String mergeFileStr*/) throws IOException - { - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - start"); - logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - unload Path: " + uldPathStr ); - - Path uldPath = new Path(uldPathStr ); - uldPath = uldPath.makeQualified(uldPath.toUri(), null); - FileSystem srcFs = FileSystem.get(uldPath.toUri(),conf); - if (!srcFs.exists(uldPath)) - { - //unload location does not exist. hdfscreate will create it later - //nothing to do - logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() -- unload location does not exist." ); - return true; - } - - Path[] files = FileUtil.stat2Paths(srcFs.listStatus(uldPath)); - logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - delete files" ); - for (Path f : files){ - srcFs.delete(f, false); - } - return true; - } - - public boolean hdfsExists(String filePathStr) throws IOException - { - logger.debug("SequenceFileWriter.hdfsExists() - start"); - logger.debug("SequenceFileWriter.hdfsExists() - Path: " + filePathStr); - - //check existence of the merge Path - Path filePath = new Path(filePathStr ); - filePath = filePath.makeQualified(filePath.toUri(), null); - FileSystem mergeFs = FileSystem.get(filePath.toUri(),conf); - if (mergeFs.exists( filePath)) - { - logger.debug("SequenceFileWriter.hdfsExists() - Path: " - + filePath + " exists" ); - return true; - } - return false; - } - - public boolean hdfsDeletePath(String pathStr) throws IOException - { - if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsDeletePath() - start - Path: " + pathStr); - Path delPath = new Path(pathStr ); - delPath = delPath.makeQualified(delPath.toUri(), null); - FileSystem fs = FileSystem.get(delPath.toUri(),conf); - fs.delete(delPath, true); - return true; - } } http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java b/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java index 80b9382..cf0cfa1 100644 --- a/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java +++ b/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java @@ -37,6 +37,7 @@ public class TrafConfiguration { Configuration lv_conf = new Configuration(); switch (config) { case HBASE_CONF: + case HDFS_CONF: String trafSiteXml = new String(System.getenv("TRAF_CONF") + "/trafodion-site.xml"); Path fileRes = new Path(trafSiteXml); lv_conf.addResource(fileRes);
