http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java new file mode 100644 index 0000000..df73448 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -0,0 +1,854 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class FileStorageManager extends StorageManager { + private final Log LOG = LogFactory.getLog(FileStorageManager.class); + + static final String OUTPUT_FILE_PREFIX="part-"; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(6); + return fmt; + } + }; + + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + return fmt; + } + }; + + protected FileSystem fs; + protected Path tableBaseDir; + protected boolean blocksMetadataEnabled; + private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); + + public FileStorageManager(StoreType storeType) { + super(storeType); + } + + @Override + protected void storageInit() throws IOException { + this.tableBaseDir = TajoConf.getWarehouseDir(conf); + this.fs = tableBaseDir.getFileSystem(conf); + this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + if (!this.blocksMetadataEnabled) + LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) + throws IOException { + FileSystem fs = path.getFileSystem(conf); + FileStatus status = fs.getFileStatus(path); + return getFileScanner(meta, schema, path, status); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) + throws IOException { + Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + return getScanner(meta, schema, fragment); + } + + public FileSystem getFileSystem() { + return this.fs; + } + + public Path getWarehouseDir() { + return this.tableBaseDir; + } + + public void delete(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + fs.delete(tablePath, true); + } + + public boolean exists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + return fileSystem.exists(path); + } + + /** + * This method deletes only data contained in the given path. + * + * @param path The path in which data are deleted. + * @throws IOException + */ + public void deleteData(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + FileStatus[] fileLists = fileSystem.listStatus(path); + for (FileStatus status : fileLists) { + fileSystem.delete(status.getPath(), true); + } + } + + public Path getTablePath(String tableName) { + return new Path(tableBaseDir, tableName); + } + + @VisibleForTesting + public Appender getAppender(TableMeta meta, Schema schema, Path filePath) + throws IOException { + return getAppender(null, null, meta, schema, filePath); + } + + public FileFragment[] split(String tableName) throws IOException { + Path tablePath = new Path(tableBaseDir, tableName); + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, long fragmentSize) throws IOException { + Path tablePath = new Path(tableBaseDir, tableName); + return split(tableName, tablePath, fragmentSize); + } + + public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen()); + listTablets.add(tablet); + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public FileFragment[] split(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, Path tablePath) throws IOException { + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + private FileFragment[] split(String tableName, Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, + Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public long calculateSize(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + long totalSize = 0; + + if (fs.exists(tablePath)) { + totalSize = fs.getContentSummary(tablePath).getLength(); + } + + return totalSize; + } + + ///////////////////////////////////////////////////////////////////////////// + // FileInputFormat Area + ///////////////////////////////////////////////////////////////////////////// + + public static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) { + if (taskAttemptId == null) { + // For testcase + return workDir; + } + // The final result of a task will be written in a file named part-ss-nnnnnnn, + // where ss is the subquery id associated with this task, and nnnnnn is the task id. + Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, + OUTPUT_FILE_PREFIX + + OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + + OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" + + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); + LOG.info("Output File Path: " + outFilePath); + + return outFilePath; + } + + /** + * Proxy PathFilter that accepts a path only if all filters given in the + * constructor do. Used by the listPaths() to apply the built-in + * hiddenFileFilter together with a user provided one (if any). + */ + private static class MultiPathFilter implements PathFilter { + private List<PathFilter> filters; + + public MultiPathFilter(List<PathFilter> filters) { + this.filters = filters; + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } + + /** + * List input directories. + * Subclasses may override to, e.g., select only files matching a regular + * expression. + * + * @return array of FileStatus objects + * @throws IOException if zero items. + */ + protected List<FileStatus> listStatus(Path... dirs) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + + List<IOException> errors = new ArrayList<IOException>(); + + // creates a MultiPathFilter with the hiddenFileFilter and the + // user provided one (if any). + List<PathFilter> filters = new ArrayList<PathFilter>(); + filters.add(hiddenFileFilter); + + PathFilter inputFilter = new MultiPathFilter(filters); + + for (int i = 0; i < dirs.length; ++i) { + Path p = dirs[i]; + + FileSystem fs = p.getFileSystem(conf); + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + p)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + p + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + if (globStat.isDirectory()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), + inputFilter)) { + result.add(stat); + } + } else { + result.add(globStat); + } + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result; + } + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + * <p/> + * <code>FileInputFormat</code> implementations can override this and return + * <code>false</code> to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param path the file name to check + * @param status get the file length + * @return is this file isSplittable? + */ + protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { + Scanner scanner = getFileScanner(meta, schema, path, status); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + private static final double SPLIT_SLOP = 1.1; // 10% slop + + protected int getBlockIndex(BlockLocation[] blkLocations, + long offset) { + for (int i = 0; i < blkLocations.length; i++) { + // is the offset inside this block? + if ((blkLocations[i].getOffset() <= offset) && + (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1; + throw new IllegalArgumentException("Offset " + offset + + " is outside of file (0.." + + fileLength + ")"); + } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { + return new FileFragment(fragmentId, file, start, length); + } + + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, + String[] hosts) { + return new FileFragment(fragmentId, file, start, length, hosts); + } + + protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) + throws IOException { + return new FileFragment(fragmentId, file, blockLocation); + } + + // for Non Splittable. eg, compressed gzip TextFile + protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations) throws IOException { + + Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet()); + Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { + + @Override + public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { + return v1.getValue().compareTo(v2.getValue()); + } + }); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + return new FileFragment(fragmentId, file, start, length, hosts); + } + + /** + * Get the minimum split size + * + * @return the minimum number of bytes that can be in a split + */ + public long getMinSplitSize() { + return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); + } + + /** + * Get Disk Ids by Volume Bytes + */ + private int[] getDiskIds(VolumeId[] volumeIds) { + int[] diskIds = new int[volumeIds.length]; + for (int i = 0; i < volumeIds.length; i++) { + int diskId = -1; + if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { + diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); + } + diskIds[i] = diskId; + } + return diskIds; + } + + /** + * Generate the map of host and make them into Volume Ids. + * + */ + private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) { + Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>(); + for (FileFragment frag : frags) { + String[] hosts = frag.getHosts(); + int[] diskIds = frag.getDiskIds(); + for (int i = 0; i < hosts.length; i++) { + Set<Integer> volumeList = volumeMap.get(hosts[i]); + if (volumeList == null) { + volumeList = new HashSet<Integer>(); + volumeMap.put(hosts[i], volumeList); + } + + if (diskIds.length > 0 && diskIds[i] > -1) { + volumeList.add(diskIds[i]); + } + } + } + + return volumeMap; + } + /** + * Generate the list of files and make them into FileSplits. + * + * @throws IOException + */ + public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) + throws IOException { + // generate splits' + + List<Fragment> splits = Lists.newArrayList(); + List<Fragment> volumeSplits = Lists.newArrayList(); + List<BlockLocation> blockLocations = Lists.newArrayList(); + + for (Path p : inputs) { + FileSystem fs = p.getFileSystem(conf); + ArrayList<FileStatus> files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + int previousSplitSize = splits.size(); + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittable(meta, schema, path, file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + + } else { + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts())); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts())); + } + } else { // Non splittable + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + } else { + //for zero length files + splits.add(makeSplit(tableName, path, 0, length)); + } + } + if(LOG.isDebugEnabled()){ + LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); + } + } + + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); + LOG.info("Total # of splits: " + splits.size()); + return splits; + } + + private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) + throws IOException { + + int locationSize = blockLocations.size(); + int splitSize = splits.size(); + if (locationSize == 0 || splitSize == 0) return; + + if (locationSize != splitSize) { + // splits and locations don't match up + LOG.warn("Number of block locations not equal to number of splits: " + + "#locations=" + locationSize + + " #splits=" + splitSize); + return; + } + + DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); + int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); + int blockLocationIdx = 0; + + Iterator<Fragment> iter = splits.iterator(); + while (locationSize > blockLocationIdx) { + + int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); + List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); + //BlockStorageLocation containing additional volume location information for each replica of each block. + BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); + + for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { + ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); + blockLocationIdx++; + } + } + LOG.info("# of splits with volumeId " + splitSize); + } + + private static class InvalidInputException extends IOException { + List<IOException> errors; + public InvalidInputException(List<IOException> errors) { + this.errors = errors; + } + + @Override + public String getMessage(){ + StringBuffer sb = new StringBuffer(); + int messageLimit = Math.min(errors.size(), 10); + for (int i = 0; i < messageLimit ; i ++) { + sb.append(errors.get(i).getMessage()).append("\n"); + } + + if(messageLimit < errors.size()) + sb.append("skipped .....").append("\n"); + + return sb.toString(); + } + } + + @Override + public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException { + return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath())); + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + if (!tableDesc.isExternal()) { + String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String simpleTableName = splitted[1]; + + // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) + Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName); + tableDesc.setPath(tablePath.toUri()); + } else { + Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given."); + } + + Path path = new Path(tableDesc.getPath()); + + FileSystem fs = path.getFileSystem(conf); + TableStats stats = new TableStats(); + if (tableDesc.isExternal()) { + if (!fs.exists(path)) { + LOG.error(path.toUri() + " does not exist"); + throw new IOException("ERROR: " + path.toUri() + " does not exist"); + } + } else { + fs.mkdirs(path); + } + + long totalSize = 0; + + try { + totalSize = calculateSize(path); + } catch (IOException e) { + LOG.warn("Cannot calculate the size of the relation", e); + } + + stats.setNumBytes(totalSize); + + if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + } + + tableDesc.setStats(stats); + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + try { + Path path = new Path(tableDesc.getPath()); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Delete table data dir: " + path); + fs.delete(path, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + } + + @Override + public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException { + // Listing table data file which is not empty. + // If the table is a partitioned table, return file list which has same partition key. + Path tablePath = new Path(tableDesc.getPath()); + FileSystem fs = tablePath.getFileSystem(conf); + + List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); + if (fs.exists(tablePath)) { + getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numFragments, + new AtomicInteger(0)); + } + + List<Fragment> fragments = new ArrayList<Fragment>(); + + //In the case of partitioned table, return same partition key data files. + int numPartitionColumns = 0; + if (tableDesc.hasPartition()) { + numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); + } + String[] previousPartitionPathNames = null; + for (FileStatus eachFile: nonZeroLengthFiles) { + FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); + + if (numPartitionColumns > 0) { + // finding partition key; + Path filePath = fileFragment.getPath(); + Path parentPath = filePath; + String[] parentPathNames = new String[numPartitionColumns]; + for (int i = 0; i < numPartitionColumns; i++) { + parentPath = parentPath.getParent(); + parentPathNames[numPartitionColumns - i - 1] = parentPath.getName(); + } + + // If current partitionKey == previousPartitionKey, add to result. + if (previousPartitionPathNames == null) { + fragments.add(fileFragment); + } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { + fragments.add(fileFragment); + } else { + break; + } + previousPartitionPathNames = parentPathNames; + } else { + fragments.add(fileFragment); + } + } + + return fragments; + } + + private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result, + int startFileIndex, int numResultFiles, + AtomicInteger currentFileIndex) throws IOException { + if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path, FileStorageManager.hiddenFileFilter); + if (files != null && files.length > 0) { + for (FileStatus eachFile : files) { + if (result.size() >= numResultFiles) { + return; + } + if (eachFile.isDirectory()) { + getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles, + currentFileIndex); + } else if (eachFile.isFile() && eachFile.getLen() > 0) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(eachFile); + } + currentFileIndex.incrementAndGet(); + } + } + } + } else { + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus != null && fileStatus.getLen() > 0) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(fileStatus); + } + currentFileIndex.incrementAndGet(); + if (result.size() >= numResultFiles) { + return; + } + } + } + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(false); + if (storeType == StoreType.RAW) { + storageProperty.setSupportsInsertInto(false); + } else { + storageProperty.setSupportsInsertInto(true); + } + + return storageProperty; + } + + @Override + public void closeStorageManager() { + } + + @Override + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + } + + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + return null; + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param path The data file path + * @return Scanner instance + * @throws java.io.IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { + + FileSystem fs = path.getFileSystem(conf); + FileStatus status = fs.getFileStatus(path); + FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + + return getSeekableScanner(conf, meta, schema, fragment, schema); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java new file mode 100644 index 0000000..33b2750 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HashShuffleAppender implements Appender { + private static Log LOG = LogFactory.getLog(HashShuffleAppender.class); + + private FileAppender appender; + private AtomicBoolean closed = new AtomicBoolean(false); + private int partId; + + private TableStats tableStats; + + //<taskId,<page start offset,<task start, task end>>> + private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; + + //page start offset, length + private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + + private Pair<Long, Integer> currentPage; + + private int pageSize; //MB + + private int rowNumInPage; + + private int totalRows; + + private long offset; + + private ExecutionBlockId ebId; + + public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { + this.ebId = ebId; + this.partId = partId; + this.appender = appender; + this.pageSize = pageSize; + } + + @Override + public void init() throws IOException { + currentPage = new Pair(0L, 0); + taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); + rowNumInPage = 0; + } + + /** + * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. + * After writing if a current page exceeds pageSize, pageOffset will be added. + * @param taskId + * @param tuples + * @return written bytes + * @throws java.io.IOException + */ + public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException { + synchronized(appender) { + if (closed.get()) { + return 0; + } + long currentPos = appender.getOffset(); + + for (Tuple eachTuple: tuples) { + appender.addTuple(eachTuple); + } + long posAfterWritten = appender.getOffset(); + + int writtenBytes = (int)(posAfterWritten - currentPos); + + int nextRowNum = rowNumInPage + tuples.size(); + List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); + if (taskIndexes == null) { + taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + taskTupleIndexes.put(taskId, taskIndexes); + } + taskIndexes.add( + new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); + rowNumInPage = nextRowNum; + + if (posAfterWritten - currentPage.getFirst() > pageSize) { + nextPage(posAfterWritten); + rowNumInPage = 0; + } + + totalRows += tuples.size(); + return writtenBytes; + } + } + + public long getOffset() throws IOException { + if (closed.get()) { + return offset; + } else { + return appender.getOffset(); + } + } + + private void nextPage(long pos) { + currentPage.setSecond((int) (pos - currentPage.getFirst())); + pages.add(currentPage); + currentPage = new Pair(pos, 0); + } + + @Override + public void addTuple(Tuple t) throws IOException { + throw new IOException("Not support addTuple, use addTuples()"); + } + + @Override + public void flush() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + } + } + + @Override + public long getEstimatedOutputSize() throws IOException { + return pageSize * pages.size(); + } + + @Override + public void close() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + offset = appender.getOffset(); + if (offset > currentPage.getFirst()) { + nextPage(offset); + } + appender.close(); + if (LOG.isDebugEnabled()) { + if (!pages.isEmpty()) { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() + + ", lastPage=" + pages.get(pages.size() - 1)); + } else { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); + } + } + closed.set(true); + tableStats = appender.getStats(); + } + } + + @Override + public void enableStats() { + } + + @Override + public TableStats getStats() { + synchronized(appender) { + return appender.getStats(); + } + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + + public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { + return taskTupleIndexes; + } + + public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { + List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + + for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { + merged.addAll(eachFailureIndex); + } + + return merged; + } + + public void taskFinished(QueryUnitAttemptId taskId) { + taskTupleIndexes.remove(taskId); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java new file mode 100644 index 0000000..636ae0f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender)((FileStorageManager)StorageManager.getFileStorageManager( + tajoConf, null)).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + } + + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { + try { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; + synchronized (appenderMap) { + partitionAppenderMap = appenderMap.remove(ebId); + } + + if (partitionAppenderMap == null) { + LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + return null; + } + + // Send Intermediate data to QueryMaster. + List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>(); + for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { + try { + eachMeta.appender.close(); + HashShuffleIntermediate intermediate = + new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), + eachMeta.appender.getPages(), + eachMeta.appender.getMergedTupleIndexes()); + intermEntries.add(intermediate); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } + + LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size()); + + return intermEntries; + } + + public void finalizeTask(QueryUnitAttemptId taskId) { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = + appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId()); + if (partitionAppenderMap == null) { + return; + } + + for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) { + eachAppender.appender.taskFinished(taskId); + } + } + } + + public static class HashShuffleIntermediate { + private int partId; + + private long volume; + + //[<page start offset,<task start, task end>>] + private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes; + + //[<page start offset, length>] + private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + + public HashShuffleIntermediate(int partId, long volume, + List<Pair<Long, Integer>> pages, + Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) { + this.partId = partId; + this.volume = volume; + this.failureTskTupleIndexes = failureTskTupleIndexes; + this.pages = pages; + } + + public int getPartId() { + return partId; + } + + public long getVolume() { + return volume; + } + + public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() { + return failureTskTupleIndexes; + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + } + + static class PartitionAppenderMeta { + int partId; + HashShuffleAppender appender; + Path dataFile; + + public int getPartId() { + return partId; + } + + public HashShuffleAppender getAppender() { + return appender; + } + + public Path getDataFile() { + return dataFile; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java new file mode 100644 index 0000000..0f31baf --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java @@ -0,0 +1,559 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +/** + * A class that provides a line reader from an input stream. + * Depending on the constructor used, lines will either be terminated by: + * <ul> + * <li>one of the following: '\n' (LF) , '\r' (CR), + * or '\r\n' (CR+LF).</li> + * <li><em>or</em>, a custom byte sequence delimiter</li> + * </ul> + * In both cases, EOF also terminates an otherwise unterminated + * line. + */ + +public class LineReader implements Closeable { + private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private int bufferSize = DEFAULT_BUFFER_SIZE; + private InputStream in; + private byte[] buffer; + // the number of bytes of real data in the buffer + private int bufferLength = 0; + // the current position in the buffer + private int bufferPosn = 0; + + private static final byte CR = '\r'; + private static final byte LF = '\n'; + + // The line delimiter + private final byte[] recordDelimiterBytes; + + /** + * Create a line reader that reads from the given stream using the + * default buffer-size (64k). + * + * @param in The input stream + * @throws java.io.IOException + */ + public LineReader(InputStream in) { + this(in, DEFAULT_BUFFER_SIZE); + } + + /** + * Create a line reader that reads from the given stream using the + * given buffer-size. + * + * @param in The input stream + * @param bufferSize Size of the read buffer + * @throws java.io.IOException + */ + public LineReader(InputStream in, int bufferSize) { + this.in = in; + this.bufferSize = bufferSize; + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = null; + } + + /** + * Create a line reader that reads from the given stream using the + * <code>io.file.buffer.size</code> specified in the given + * <code>Configuration</code>. + * + * @param in input stream + * @param conf configuration + * @throws java.io.IOException + */ + public LineReader(InputStream in, Configuration conf) throws IOException { + this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); + } + + /** + * Create a line reader that reads from the given stream using the + * default buffer-size, and using a custom delimiter of array of + * bytes. + * + * @param in The input stream + * @param recordDelimiterBytes The delimiter + */ + public LineReader(InputStream in, byte[] recordDelimiterBytes) { + this.in = in; + this.bufferSize = DEFAULT_BUFFER_SIZE; + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = recordDelimiterBytes; + } + + /** + * Create a line reader that reads from the given stream using the + * given buffer-size, and using a custom delimiter of array of + * bytes. + * + * @param in The input stream + * @param bufferSize Size of the read buffer + * @param recordDelimiterBytes The delimiter + * @throws java.io.IOException + */ + public LineReader(InputStream in, int bufferSize, + byte[] recordDelimiterBytes) { + this.in = in; + this.bufferSize = bufferSize; + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = recordDelimiterBytes; + } + + /** + * Create a line reader that reads from the given stream using the + * <code>io.file.buffer.size</code> specified in the given + * <code>Configuration</code>, and using a custom delimiter of array of + * bytes. + * + * @param in input stream + * @param conf configuration + * @param recordDelimiterBytes The delimiter + * @throws java.io.IOException + */ + public LineReader(InputStream in, Configuration conf, + byte[] recordDelimiterBytes) throws IOException { + this.in = in; + this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); + this.buffer = new byte[this.bufferSize]; + this.recordDelimiterBytes = recordDelimiterBytes; + } + + + /** + * Close the underlying stream. + * + * @throws java.io.IOException + */ + public void close() throws IOException { + in.close(); + } + + public void reset() { + bufferLength = 0; + bufferPosn = 0; + + } + + /** + * Read one line from the InputStream into the given Text. + * + * @param str the object to store the given line (without newline) + * @param maxLineLength the maximum number of bytes to store into str; + * the rest of the line is silently discarded. + * @param maxBytesToConsume the maximum number of bytes to consume + * in this call. This is only a hint, because if the line cross + * this threshold, we allow it to happen. It can overshoot + * potentially by as much as one buffer length. + * @return the number of bytes read including the (longest) newline + * found. + * @throws java.io.IOException if the underlying stream throws + */ + public int readLine(Text str, int maxLineLength, + int maxBytesToConsume) throws IOException { + if (this.recordDelimiterBytes != null) { + return readCustomLine(str, maxLineLength, maxBytesToConsume); + } else { + return readDefaultLine(str, maxLineLength, maxBytesToConsume); + } + } + + protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) + throws IOException { + return in.read(buffer); + } + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + /* We're reading data from in, but the head of the stream may be + * already buffered in buffer, so we have several cases: + * 1. No newline characters are in the buffer, so we need to copy + * everything and read another buffer from the stream. + * 2. An unambiguously terminated line is in buffer, so we just + * copy to str. + * 3. Ambiguously terminated line is in buffer, i.e. buffer ends + * in CR. In this case we copy everything up to CR to str, but + * we also need to see what follows CR: if it's LF, then we + * need consume LF as well, so next call to readLine will read + * from after that. + * We use a flag prevCharCR to signal if previous character was CR + * and, if it happens to be at the end of the buffer, delay + * consuming it until we have a chance to look at the char that + * follows. + */ + str.clear(); + int txtLength = 0; //tracks str.getLength(), as an optimization + int newlineLength = 0; //length of terminating newline + boolean prevCharCR = false; //true of prev char was CR + long bytesConsumed = 0; + do { + int startPosn = bufferPosn; //starting from where we left off the last time + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + if (prevCharCR) { + ++bytesConsumed; //account for CR from previous read + } + bufferLength = fillBuffer(in, buffer, prevCharCR); + if (bufferLength <= 0) { + break; // EOF + } + } + for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline + if (buffer[bufferPosn] == LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; // at next invocation proceed from following byte + break; + } + if (prevCharCR) { //CR + notLF, we are at notLF + newlineLength = 1; + break; + } + prevCharCR = (buffer[bufferPosn] == CR); + } + int readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; //CR at the end of the buffer + } + bytesConsumed += readLength; + int appendLength = readLength - newlineLength; + if (appendLength > maxLineLength - txtLength) { + appendLength = maxLineLength - txtLength; + } + if (appendLength > 0) { + str.append(buffer, startPosn, appendLength); + txtLength += appendLength; + } + } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); + + if (bytesConsumed > (long) Integer.MAX_VALUE) { + throw new IOException("Too many bytes before newline: " + bytesConsumed); + } + return (int) bytesConsumed; + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength + , int maxBytesToConsume) + throws IOException { + /* We're reading data from in, but the head of the stream may be + * already buffered in buffer, so we have several cases: + * 1. No newline characters are in the buffer, so we need to copy + * everything and read another buffer from the stream. + * 2. An unambiguously terminated line is in buffer, so we just + * copy to str. + * 3. Ambiguously terminated line is in buffer, i.e. buffer ends + * in CR. In this case we copy everything up to CR to str, but + * we also need to see what follows CR: if it's LF, then we + * need consume LF as well, so next call to readLine will read + * from after that. + * We use a flag prevCharCR to signal if previous character was CR + * and, if it happens to be at the end of the buffer, delay + * consuming it until we have a chance to look at the char that + * follows. + */ + + int txtLength = 0; //tracks str.getLength(), as an optimization + int newlineLength = 0; //length of terminating newline + boolean prevCharCR = false; //true of prev char was CR + long bytesConsumed = 0; + do { + int startPosn = bufferPosn; //starting from where we left off the last time + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + if (prevCharCR) { + ++bytesConsumed; //account for CR from previous read + } + bufferLength = fillBuffer(in, buffer, prevCharCR); + if (bufferLength <= 0) { + break; // EOF + } + } + for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline + if (buffer[bufferPosn] == LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; // at next invocation proceed from following byte + break; + } + if (prevCharCR) { //CR + notLF, we are at notLF + newlineLength = 1; + break; + } + prevCharCR = (buffer[bufferPosn] == CR); + } + int readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; //CR at the end of the buffer + } + bytesConsumed += readLength; + int appendLength = readLength - newlineLength; + if (appendLength > maxLineLength - txtLength) { + appendLength = maxLineLength - txtLength; + } + if (appendLength > 0) { + str.write(buffer, startPosn, appendLength); + txtLength += appendLength; + } + } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); + + if (bytesConsumed > (long) Integer.MAX_VALUE) { + throw new IOException("Too many bytes before newline: " + bytesConsumed); + } + + if (bytesConsumed > 0) offsets.add(txtLength); + return (int) bytesConsumed; + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + +/* int validIdx = 0; + public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets, + long pos, int maxLineLength, int maxBytesToConsume) + throws IOException { + *//* We're reading data from in, but the head of the stream may be + * already buffered in buffer, so we have several cases: + * 1. No newline characters are in the buffer, so we need to copy + * everything and read another buffer from the stream. + * 2. An unambiguously terminated line is in buffer, so we just + * copy to str. + * 3. Ambiguously terminated line is in buffer, i.e. buffer ends + * in CR. In this case we copy everything up to CR to str, but + * we also need to see what follows CR: if it's LF, then we + * need consume LF as well, so next call to readLine will read + * from after that. + * We use a flag prevCharCR to signal if previous character was CR + * and, if it happens to be at the end of the buffer, delay + * consuming it until we have a chance to look at the char that + * follows. + *//* + //str.clear(); + str.reset(); + offsets.clear(); + foffsets.clear(); + + validIdx = 0; + long bufferBytesConsumed = 0; + + int txtLength = 0; //tracks str.getLength(), as an optimization + int newlineLength = 0; //length of terminating newline + boolean prevCharCR = false; //true of prev char was CR + long bytesConsumed = 0; + do { + + int startPosn = bufferPosn; //starting from where we left off the last time + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + if (prevCharCR) { + ++bytesConsumed; //account for CR from previous read + } + bufferLength = in.read(buffer); + if (bufferLength <= 0) { + break; // EOF + } + } + for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline + if (buffer[bufferPosn] == LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; // at next invocation proceed from following byte + break; + } + if (prevCharCR) { //CR + notLF, we are at notLF + newlineLength = 1; + break; + } + prevCharCR = (buffer[bufferPosn] == CR); + } + int readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; //CR at the end of the buffer + } + bytesConsumed += readLength; + int appendLength = readLength - newlineLength; + if (appendLength > maxLineLength - txtLength) { + appendLength = maxLineLength - txtLength; + } + + if (appendLength > 0) { + str.write(buffer, startPosn, appendLength); + //System.out.println(startPosn + "," + appendLength); + //str.append(buffer, startPosn, appendLength); + txtLength += appendLength; + } + + if(newlineLength > 0){ + validIdx++; + + if (bytesConsumed > (long)Integer.MAX_VALUE) { + throw new IOException("Too many bytes before newline: " + bytesConsumed); + } + offsets.add(txtLength); + foffsets.add(pos); + pos+= bytesConsumed; + bufferBytesConsumed += bytesConsumed; + + txtLength = 0; + newlineLength = 0; + prevCharCR = false; //true of prev char was CR + bytesConsumed = 0; + } else { + bufferBytesConsumed += bytesConsumed; + bytesConsumed = 0; + } + } while ((bufferBytesConsumed < 256 * 1024)); + + return (int)bufferBytesConsumed; + }*/ + + /** + * Read a line terminated by a custom delimiter. + */ + private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + /* We're reading data from inputStream, but the head of the stream may be + * already captured in the previous buffer, so we have several cases: + * + * 1. The buffer tail does not contain any character sequence which + * matches with the head of delimiter. We count it as a + * ambiguous byte count = 0 + * + * 2. The buffer tail contains a X number of characters, + * that forms a sequence, which matches with the + * head of delimiter. We count ambiguous byte count = X + * + * // *** eg: A segment of input file is as follows + * + * " record 1792: I found this bug very interesting and + * I have completely read about it. record 1793: This bug + * can be solved easily record 1794: This ." + * + * delimiter = "record"; + * + * supposing:- String at the end of buffer = + * "I found this bug very interesting and I have completely re" + * There for next buffer = "ad about it. record 179 ...." + * + * The matching characters in the input + * buffer tail and delimiter head = "re" + * Therefore, ambiguous byte count = 2 **** // + * + * 2.1 If the following bytes are the remaining characters of + * the delimiter, then we have to capture only up to the starting + * position of delimiter. That means, we need not include the + * ambiguous characters in str. + * + * 2.2 If the following bytes are not the remaining characters of + * the delimiter ( as mentioned in the example ), + * then we have to include the ambiguous characters in str. + */ + str.clear(); + int txtLength = 0; // tracks str.getLength(), as an optimization + long bytesConsumed = 0; + int delPosn = 0; + int ambiguousByteCount = 0; // To capture the ambiguous characters count + do { + int startPosn = bufferPosn; // Start from previous end position + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); + if (bufferLength <= 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + break; // EOF + } + } + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { + delPosn++; + if (delPosn >= recordDelimiterBytes.length) { + bufferPosn++; + break; + } + } else if (delPosn != 0) { + bufferPosn--; + delPosn = 0; + } + } + int readLength = bufferPosn - startPosn; + bytesConsumed += readLength; + int appendLength = readLength - delPosn; + if (appendLength > maxLineLength - txtLength) { + appendLength = maxLineLength - txtLength; + } + if (appendLength > 0) { + if (ambiguousByteCount > 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + //appending the ambiguous characters (refer case 2.2) + bytesConsumed += ambiguousByteCount; + ambiguousByteCount = 0; + } + str.append(buffer, startPosn, appendLength); + txtLength += appendLength; + } + if (bufferPosn >= bufferLength) { + if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { + ambiguousByteCount = delPosn; + bytesConsumed -= ambiguousByteCount; //to be consumed in next + } + } + } while (delPosn < recordDelimiterBytes.length + && bytesConsumed < maxBytesToConsume); + if (bytesConsumed > (long) Integer.MAX_VALUE) { + throw new IOException("Too many bytes before delimiter: " + bytesConsumed); + } + return (int) bytesConsumed; + } + + /** + * Read from the InputStream into the given Text. + * + * @param str the object to store the given line + * @param maxLineLength the maximum number of bytes to store into str. + * @return the number of bytes read including the newline + * @throws java.io.IOException if the underlying stream throws + */ + public int readLine(Text str, int maxLineLength) throws IOException { + return readLine(str, maxLineLength, Integer.MAX_VALUE); + } + + /** + * Read from the InputStream into the given Text. + * + * @param str the object to store the given line + * @return the number of bytes read including the newline + * @throws java.io.IOException if the underlying stream throws + */ + public int readLine(Text str) throws IOException { + return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); + } +}
