http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index df73448,0000000..060bf16 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@@ -1,854 -1,0 +1,882 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class FileStorageManager extends StorageManager { + private final Log LOG = LogFactory.getLog(FileStorageManager.class); + + static final String OUTPUT_FILE_PREFIX="part-"; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(6); + return fmt; + } + }; + + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + return fmt; + } + }; + + protected FileSystem fs; + protected Path tableBaseDir; + protected boolean blocksMetadataEnabled; + private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); + + public FileStorageManager(StoreType storeType) { + super(storeType); + } + + @Override + protected void storageInit() throws IOException { + this.tableBaseDir = TajoConf.getWarehouseDir(conf); + this.fs = tableBaseDir.getFileSystem(conf); + this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + if (!this.blocksMetadataEnabled) + LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) + throws IOException { + FileSystem fs = path.getFileSystem(conf); + FileStatus status = fs.getFileStatus(path); + return getFileScanner(meta, schema, path, status); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) + throws IOException { + Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + return getScanner(meta, schema, fragment); + } + + public FileSystem getFileSystem() { + return this.fs; + } + + public Path getWarehouseDir() { + return this.tableBaseDir; + } + + public void delete(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + fs.delete(tablePath, true); + } + + public boolean exists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + return fileSystem.exists(path); + } + + /** + * This method deletes only data contained in the given path. + * + * @param path The path in which data are deleted. + * @throws IOException + */ + public void deleteData(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + FileStatus[] fileLists = fileSystem.listStatus(path); + for (FileStatus status : fileLists) { + fileSystem.delete(status.getPath(), true); + } + } + + public Path getTablePath(String tableName) { + return new Path(tableBaseDir, tableName); + } + + @VisibleForTesting + public Appender getAppender(TableMeta meta, Schema schema, Path filePath) + throws IOException { + return getAppender(null, null, meta, schema, filePath); + } + + public FileFragment[] split(String tableName) throws IOException { + Path tablePath = new Path(tableBaseDir, tableName); + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, long fragmentSize) throws IOException { + Path tablePath = new Path(tableBaseDir, tableName); + return split(tableName, tablePath, fragmentSize); + } + + public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen()); + listTablets.add(tablet); + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public FileFragment[] split(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, Path tablePath) throws IOException { + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + private FileFragment[] split(String tableName, Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, + Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public long calculateSize(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + long totalSize = 0; + + if (fs.exists(tablePath)) { + totalSize = fs.getContentSummary(tablePath).getLength(); + } + + return totalSize; + } + + ///////////////////////////////////////////////////////////////////////////// + // FileInputFormat Area + ///////////////////////////////////////////////////////////////////////////// - - public static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; - + public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) { + if (taskAttemptId == null) { + // For testcase + return workDir; + } + // The final result of a task will be written in a file named part-ss-nnnnnnn, + // where ss is the subquery id associated with this task, and nnnnnn is the task id. + Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, + OUTPUT_FILE_PREFIX + + OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + + OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" + + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); + LOG.info("Output File Path: " + outFilePath); + + return outFilePath; + } + + /** + * Proxy PathFilter that accepts a path only if all filters given in the + * constructor do. Used by the listPaths() to apply the built-in + * hiddenFileFilter together with a user provided one (if any). + */ + private static class MultiPathFilter implements PathFilter { + private List<PathFilter> filters; + + public MultiPathFilter(List<PathFilter> filters) { + this.filters = filters; + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } + + /** + * List input directories. + * Subclasses may override to, e.g., select only files matching a regular + * expression. + * + * @return array of FileStatus objects + * @throws IOException if zero items. + */ + protected List<FileStatus> listStatus(Path... dirs) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + + List<IOException> errors = new ArrayList<IOException>(); + + // creates a MultiPathFilter with the hiddenFileFilter and the + // user provided one (if any). + List<PathFilter> filters = new ArrayList<PathFilter>(); + filters.add(hiddenFileFilter); + + PathFilter inputFilter = new MultiPathFilter(filters); + + for (int i = 0; i < dirs.length; ++i) { + Path p = dirs[i]; + + FileSystem fs = p.getFileSystem(conf); + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + p)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + p + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + if (globStat.isDirectory()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), + inputFilter)) { + result.add(stat); + } + } else { + result.add(globStat); + } + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result; + } + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + * <p/> + * <code>FileInputFormat</code> implementations can override this and return + * <code>false</code> to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param path the file name to check + * @param status get the file length + * @return is this file isSplittable? + */ + protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { + Scanner scanner = getFileScanner(meta, schema, path, status); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + private static final double SPLIT_SLOP = 1.1; // 10% slop + + protected int getBlockIndex(BlockLocation[] blkLocations, + long offset) { + for (int i = 0; i < blkLocations.length; i++) { + // is the offset inside this block? + if ((blkLocations[i].getOffset() <= offset) && + (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1; + throw new IllegalArgumentException("Offset " + offset + + " is outside of file (0.." + + fileLength + ")"); + } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { + return new FileFragment(fragmentId, file, start, length); + } + + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, + String[] hosts) { + return new FileFragment(fragmentId, file, start, length, hosts); + } + + protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) + throws IOException { + return new FileFragment(fragmentId, file, blockLocation); + } + + // for Non Splittable. eg, compressed gzip TextFile + protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations) throws IOException { + + Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet()); + Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { + + @Override + public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { + return v1.getValue().compareTo(v2.getValue()); + } + }); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + return new FileFragment(fragmentId, file, start, length, hosts); + } + + /** + * Get the minimum split size + * + * @return the minimum number of bytes that can be in a split + */ + public long getMinSplitSize() { + return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); + } + + /** + * Get Disk Ids by Volume Bytes + */ + private int[] getDiskIds(VolumeId[] volumeIds) { + int[] diskIds = new int[volumeIds.length]; + for (int i = 0; i < volumeIds.length; i++) { + int diskId = -1; + if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { + diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); + } + diskIds[i] = diskId; + } + return diskIds; + } + + /** + * Generate the map of host and make them into Volume Ids. + * + */ + private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) { + Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>(); + for (FileFragment frag : frags) { + String[] hosts = frag.getHosts(); + int[] diskIds = frag.getDiskIds(); + for (int i = 0; i < hosts.length; i++) { + Set<Integer> volumeList = volumeMap.get(hosts[i]); + if (volumeList == null) { + volumeList = new HashSet<Integer>(); + volumeMap.put(hosts[i], volumeList); + } + + if (diskIds.length > 0 && diskIds[i] > -1) { + volumeList.add(diskIds[i]); + } + } + } + + return volumeMap; + } + /** + * Generate the list of files and make them into FileSplits. + * + * @throws IOException + */ + public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) + throws IOException { + // generate splits' + + List<Fragment> splits = Lists.newArrayList(); + List<Fragment> volumeSplits = Lists.newArrayList(); + List<BlockLocation> blockLocations = Lists.newArrayList(); + + for (Path p : inputs) { + FileSystem fs = p.getFileSystem(conf); ++ + ArrayList<FileStatus> files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + int previousSplitSize = splits.size(); + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittable(meta, schema, path, file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + + } else { + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts())); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts())); + } + } else { // Non splittable + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + } else { + //for zero length files + splits.add(makeSplit(tableName, path, 0, length)); + } + } + if(LOG.isDebugEnabled()){ + LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); + } + } + + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); + LOG.info("Total # of splits: " + splits.size()); + return splits; + } + + private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) + throws IOException { + + int locationSize = blockLocations.size(); + int splitSize = splits.size(); + if (locationSize == 0 || splitSize == 0) return; + + if (locationSize != splitSize) { + // splits and locations don't match up + LOG.warn("Number of block locations not equal to number of splits: " + + "#locations=" + locationSize + + " #splits=" + splitSize); + return; + } + + DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); + int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); + int blockLocationIdx = 0; + + Iterator<Fragment> iter = splits.iterator(); + while (locationSize > blockLocationIdx) { + + int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); + List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); + //BlockStorageLocation containing additional volume location information for each replica of each block. + BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); + + for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { + ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); + blockLocationIdx++; + } + } + LOG.info("# of splits with volumeId " + splitSize); + } + + private static class InvalidInputException extends IOException { + List<IOException> errors; + public InvalidInputException(List<IOException> errors) { + this.errors = errors; + } + + @Override + public String getMessage(){ + StringBuffer sb = new StringBuffer(); + int messageLimit = Math.min(errors.size(), 10); + for (int i = 0; i < messageLimit ; i ++) { + sb.append(errors.get(i).getMessage()).append("\n"); + } + + if(messageLimit < errors.size()) + sb.append("skipped .....").append("\n"); + + return sb.toString(); + } + } + + @Override + public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException { + return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath())); + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + if (!tableDesc.isExternal()) { + String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String simpleTableName = splitted[1]; + + // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) + Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName); + tableDesc.setPath(tablePath.toUri()); + } else { + Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given."); + } + + Path path = new Path(tableDesc.getPath()); + + FileSystem fs = path.getFileSystem(conf); + TableStats stats = new TableStats(); + if (tableDesc.isExternal()) { + if (!fs.exists(path)) { + LOG.error(path.toUri() + " does not exist"); + throw new IOException("ERROR: " + path.toUri() + " does not exist"); + } + } else { + fs.mkdirs(path); + } + + long totalSize = 0; + + try { + totalSize = calculateSize(path); + } catch (IOException e) { + LOG.warn("Cannot calculate the size of the relation", e); + } + + stats.setNumBytes(totalSize); + + if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + } + + tableDesc.setStats(stats); + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + try { + Path path = new Path(tableDesc.getPath()); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Delete table data dir: " + path); + fs.delete(path, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + } + + @Override - public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException { ++ public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException { + // Listing table data file which is not empty. + // If the table is a partitioned table, return file list which has same partition key. + Path tablePath = new Path(tableDesc.getPath()); + FileSystem fs = tablePath.getFileSystem(conf); + ++ //In the case of partitioned table, we should return same partition key data files. ++ int partitionDepth = 0; ++ if (tableDesc.hasPartition()) { ++ partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); ++ } ++ + List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); + if (fs.exists(tablePath)) { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numFragments, - new AtomicInteger(0)); ++ getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, ++ new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + } + + List<Fragment> fragments = new ArrayList<Fragment>(); + - //In the case of partitioned table, return same partition key data files. - int numPartitionColumns = 0; - if (tableDesc.hasPartition()) { - numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); - } + String[] previousPartitionPathNames = null; + for (FileStatus eachFile: nonZeroLengthFiles) { + FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); + - if (numPartitionColumns > 0) { ++ if (partitionDepth > 0) { + // finding partition key; + Path filePath = fileFragment.getPath(); + Path parentPath = filePath; - String[] parentPathNames = new String[numPartitionColumns]; - for (int i = 0; i < numPartitionColumns; i++) { ++ String[] parentPathNames = new String[partitionDepth]; ++ for (int i = 0; i < partitionDepth; i++) { + parentPath = parentPath.getParent(); - parentPathNames[numPartitionColumns - i - 1] = parentPath.getName(); ++ parentPathNames[partitionDepth - i - 1] = parentPath.getName(); + } + + // If current partitionKey == previousPartitionKey, add to result. + if (previousPartitionPathNames == null) { + fragments.add(fileFragment); + } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { + fragments.add(fileFragment); + } else { + break; + } + previousPartitionPathNames = parentPathNames; + } else { + fragments.add(fileFragment); + } + } + + return fragments; + } + ++ /** ++ * ++ * @param fs ++ * @param path The table path ++ * @param result The final result files to be used ++ * @param startFileIndex ++ * @param numResultFiles ++ * @param currentFileIndex ++ * @param partitioned A flag to indicate if this table is partitioned ++ * @param currentDepth Current visiting depth of partition directories ++ * @param maxDepth The partition depth of this table ++ * @throws IOException ++ */ + private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result, + int startFileIndex, int numResultFiles, - AtomicInteger currentFileIndex) throws IOException { ++ AtomicInteger currentFileIndex, boolean partitioned, ++ int currentDepth, int maxDepth) throws IOException { ++ // Intermediate directory + if (fs.isDirectory(path)) { - FileStatus[] files = fs.listStatus(path, FileStorageManager.hiddenFileFilter); ++ ++ FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter); ++ + if (files != null && files.length > 0) { ++ + for (FileStatus eachFile : files) { ++ ++ // checking if the enough number of files are found + if (result.size() >= numResultFiles) { + return; + } + if (eachFile.isDirectory()) { - getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles, - currentFileIndex); - } else if (eachFile.isFile() && eachFile.getLen() > 0) { ++ ++ getNonZeroLengthDataFiles( ++ fs, ++ eachFile.getPath(), ++ result, ++ startFileIndex, ++ numResultFiles, ++ currentFileIndex, ++ partitioned, ++ currentDepth + 1, // increment a visiting depth ++ maxDepth); ++ ++ // if partitioned table, we should ignore files located in the intermediate directory. ++ // we can ensure that this file is in leaf directory if currentDepth == maxDepth. ++ } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(eachFile); + } + currentFileIndex.incrementAndGet(); + } + } + } ++ ++ // Files located in leaf directory + } else { + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus != null && fileStatus.getLen() > 0) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(fileStatus); + } + currentFileIndex.incrementAndGet(); + if (result.size() >= numResultFiles) { + return; + } + } + } + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(false); + if (storeType == StoreType.RAW) { + storageProperty.setSupportsInsertInto(false); + } else { + storageProperty.setSupportsInsertInto(true); + } + + return storageProperty; + } + + @Override + public void closeStorageManager() { + } + + @Override + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + } + + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + return null; + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param path The data file path + * @return Scanner instance + * @throws java.io.IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { + + FileSystem fs = path.getFileSystem(conf); + FileStatus status = fs.getFileStatus(path); + FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + + return getSeekableScanner(conf, meta, schema, fragment, schema); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 0000000,0000000..c1f63a8 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@@ -1,0 -1,0 +1,220 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.json; ++ ++ ++import io.netty.buffer.ByteBuf; ++import net.minidev.json.JSONArray; ++import net.minidev.json.JSONObject; ++import net.minidev.json.parser.JSONParser; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.SchemaUtil; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.common.TajoDataTypes.Type; ++import org.apache.tajo.common.exception.NotImplementedException; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.datum.NullDatum; ++import org.apache.tajo.storage.Tuple; ++import org.apache.tajo.storage.text.TextLineDeserializer; ++import org.apache.tajo.storage.text.TextLineParsingError; ++ ++import java.io.IOException; ++import java.util.Iterator; ++ ++public class JsonLineDeserializer extends TextLineDeserializer { ++ private JSONParser parser; ++ private Type [] types; ++ private String [] columnNames; ++ ++ public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { ++ super(schema, meta, targetColumnIndexes); ++ } ++ ++ @Override ++ public void init() { ++ types = SchemaUtil.toTypes(schema); ++ columnNames = SchemaUtil.toSimpleNames(schema); ++ ++ parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE); ++ } ++ ++ @Override ++ public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError { ++ byte [] line = new byte[buf.readableBytes()]; ++ buf.readBytes(line); ++ ++ try { ++ JSONObject object = (JSONObject) parser.parse(line); ++ ++ for (int i = 0; i < targetColumnIndexes.length; i++) { ++ int actualIdx = targetColumnIndexes[i]; ++ String fieldName = columnNames[actualIdx]; ++ ++ if (!object.containsKey(fieldName)) { ++ output.put(actualIdx, NullDatum.get()); ++ continue; ++ } ++ ++ switch (types[actualIdx]) { ++ case BOOLEAN: ++ String boolStr = object.getAsString(fieldName); ++ if (boolStr != null) { ++ output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true"))); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case CHAR: ++ String charStr = object.getAsString(fieldName); ++ if (charStr != null) { ++ output.put(actualIdx, DatumFactory.createChar(charStr)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case INT1: ++ case INT2: ++ Number int2Num = object.getAsNumber(fieldName); ++ if (int2Num != null) { ++ output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue())); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case INT4: ++ Number int4Num = object.getAsNumber(fieldName); ++ if (int4Num != null) { ++ output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue())); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case INT8: ++ Number int8Num = object.getAsNumber(fieldName); ++ if (int8Num != null) { ++ output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue())); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case FLOAT4: ++ Number float4Num = object.getAsNumber(fieldName); ++ if (float4Num != null) { ++ output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue())); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case FLOAT8: ++ Number float8Num = object.getAsNumber(fieldName); ++ if (float8Num != null) { ++ output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue())); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case TEXT: ++ String textStr = object.getAsString(fieldName); ++ if (textStr != null) { ++ output.put(actualIdx, DatumFactory.createText(textStr)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case TIMESTAMP: ++ String timestampStr = object.getAsString(fieldName); ++ if (timestampStr != null) { ++ output.put(actualIdx, DatumFactory.createTimestamp(timestampStr)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case TIME: ++ String timeStr = object.getAsString(fieldName); ++ if (timeStr != null) { ++ output.put(actualIdx, DatumFactory.createTime(timeStr)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case DATE: ++ String dateStr = object.getAsString(fieldName); ++ if (dateStr != null) { ++ output.put(actualIdx, DatumFactory.createDate(dateStr)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ case BIT: ++ case BINARY: ++ case VARBINARY: ++ case BLOB: { ++ Object jsonObject = object.get(fieldName); ++ ++ if (jsonObject == null) { ++ output.put(actualIdx, NullDatum.get()); ++ break; ++ } if (jsonObject instanceof String) { ++ output.put(actualIdx, DatumFactory.createBlob((String)jsonObject)); ++ } else if (jsonObject instanceof JSONArray) { ++ JSONArray jsonArray = (JSONArray) jsonObject; ++ byte[] bytes = new byte[jsonArray.size()]; ++ Iterator<Object> it = jsonArray.iterator(); ++ int arrayIdx = 0; ++ while (it.hasNext()) { ++ bytes[arrayIdx++] = ((Long) it.next()).byteValue(); ++ } ++ if (bytes.length > 0) { ++ output.put(actualIdx, DatumFactory.createBlob(bytes)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ } else { ++ throw new IOException("Unknown json object: " + object.getClass().getSimpleName()); ++ } ++ break; ++ } ++ case INET4: ++ String inetStr = object.getAsString(fieldName); ++ if (inetStr != null) { ++ output.put(actualIdx, DatumFactory.createInet4(inetStr)); ++ } else { ++ output.put(actualIdx, NullDatum.get()); ++ } ++ break; ++ ++ case NULL_TYPE: ++ output.put(actualIdx, NullDatum.get()); ++ break; ++ ++ default: ++ throw new NotImplementedException(types[actualIdx].name() + " is not supported."); ++ } ++ } ++ ++ } catch (Throwable e) { ++ throw new IOException(e); ++ } ++ } ++ ++ @Override ++ public void release() { ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java index 0000000,0000000..6db2c29 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java @@@ -1,0 -1,0 +1,37 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.json; ++ ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.storage.text.TextLineDeserializer; ++import org.apache.tajo.storage.text.TextLineSerDe; ++import org.apache.tajo.storage.text.TextLineSerializer; ++ ++public class JsonLineSerDe extends TextLineSerDe { ++ @Override ++ public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { ++ return new JsonLineDeserializer(schema, meta, targetColumnIndexes); ++ } ++ ++ @Override ++ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { ++ return new JsonLineSerializer(schema, meta); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 0000000,0000000..cd31ada new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@@ -1,0 -1,0 +1,130 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.json; ++ ++ ++import net.minidev.json.JSONObject; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.SchemaUtil; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.common.TajoDataTypes.Type; ++import org.apache.tajo.common.exception.NotImplementedException; ++import org.apache.tajo.datum.TextDatum; ++import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; ++import org.apache.tajo.storage.Tuple; ++import org.apache.tajo.storage.text.TextLineSerializer; ++ ++import java.io.IOException; ++import java.io.OutputStream; ++ ++public class JsonLineSerializer extends TextLineSerializer { ++ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); ++ ++ private Type [] types; ++ private String [] simpleNames; ++ private int columnNum; ++ ++ ++ public JsonLineSerializer(Schema schema, TableMeta meta) { ++ super(schema, meta); ++ } ++ ++ @Override ++ public void init() { ++ types = SchemaUtil.toTypes(schema); ++ simpleNames = SchemaUtil.toSimpleNames(schema); ++ columnNum = schema.size(); ++ } ++ ++ @Override ++ public int serialize(OutputStream out, Tuple input) throws IOException { ++ JSONObject jsonObject = new JSONObject(); ++ ++ for (int i = 0; i < columnNum; i++) { ++ if (input.isNull(i)) { ++ continue; ++ } ++ ++ String fieldName = simpleNames[i]; ++ Type type = types[i]; ++ ++ switch (type) { ++ ++ case BOOLEAN: ++ jsonObject.put(fieldName, input.getBool(i)); ++ break; ++ ++ case INT1: ++ case INT2: ++ jsonObject.put(fieldName, input.getInt2(i)); ++ break; ++ ++ case INT4: ++ jsonObject.put(fieldName, input.getInt4(i)); ++ break; ++ ++ case INT8: ++ jsonObject.put(fieldName, input.getInt8(i)); ++ break; ++ ++ case FLOAT4: ++ jsonObject.put(fieldName, input.getFloat4(i)); ++ break; ++ ++ case FLOAT8: ++ jsonObject.put(fieldName, input.getFloat8(i)); ++ break; ++ ++ case CHAR: ++ case TEXT: ++ case VARCHAR: ++ case INET4: ++ case TIMESTAMP: ++ case DATE: ++ case TIME: ++ case INTERVAL: ++ jsonObject.put(fieldName, input.getText(i)); ++ break; ++ ++ case BIT: ++ case BINARY: ++ case BLOB: ++ case VARBINARY: ++ jsonObject.put(fieldName, input.getBytes(i)); ++ break; ++ ++ case NULL_TYPE: ++ break; ++ ++ default: ++ throw new NotImplementedException(types[i].name() + " is not supported."); ++ } ++ } ++ ++ String jsonStr = jsonObject.toJSONString(); ++ byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET); ++ out.write(jsonBytes); ++ return jsonBytes.length; ++ } ++ ++ @Override ++ public void release() { ++ ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index 1448885,0000000..86319e1 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@@ -1,154 -1,0 +1,170 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.ByteBufInputChannel; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class ByteBufLineReader implements Closeable { + private static int DEFAULT_BUFFER = 64 * 1024; + + private int bufferSize; + private long readBytes; ++ private boolean eof = false; + private ByteBuf buffer; + private final ByteBufInputChannel channel; + private final AtomicInteger tempReadBytes = new AtomicInteger(); + private final LineSplitProcessor processor = new LineSplitProcessor(); + + public ByteBufLineReader(ByteBufInputChannel channel) { + this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); + } + + public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { + this.readBytes = 0; + this.channel = channel; + this.buffer = buf; + this.bufferSize = buf.capacity(); + } + + public long readBytes() { + return readBytes - buffer.readableBytes(); + } + + public long available() throws IOException { + return channel.available() + buffer.readableBytes(); + } + + @Override + public void close() throws IOException { + if (this.buffer.refCnt() > 0) { + this.buffer.release(); + } + this.channel.close(); + } + + public String readLine() throws IOException { + ByteBuf buf = readLineBuf(tempReadBytes); + if (buf != null) { + return buf.toString(CharsetUtil.UTF_8); + } + return null; + } + + private void fillBuffer() throws IOException { + + int tailBytes = 0; + if (this.readBytes > 0) { + this.buffer.markReaderIndex(); + this.buffer.discardSomeReadBytes(); // compact the buffer + tailBytes = this.buffer.writerIndex(); + if (!this.buffer.isWritable()) { + // a line bytes is large than the buffer + BufferPool.ensureWritable(buffer, bufferSize); + this.bufferSize = buffer.capacity(); + } + } + + boolean release = true; + try { + int readBytes = tailBytes; + for (; ; ) { + int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes); + if (localReadBytes < 0) { ++ if (tailBytes == readBytes) { ++ // no more bytes are in the channel ++ eof = true; ++ } + break; + } + readBytes += localReadBytes; + if (readBytes == bufferSize) { + break; + } + } + this.readBytes += (readBytes - tailBytes); + release = false; - this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) ++ if (!eof) { ++ this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) ++ } + } finally { + if (release) { + buffer.release(); + } + } + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { ++ if(eof) return null; ++ + int startIndex = buffer.readerIndex(); + int readBytes; + int readable; + int newlineLength; //length of terminating newline + + loop: + while (true) { + readable = buffer.readableBytes(); + if (readable <= 0) { + buffer.readerIndex(startIndex); + fillBuffer(); //compact and fill buffer + if (!buffer.isReadable()) { + return null; + } else { - startIndex = 0; // reset the line start position ++ if (!eof) startIndex = 0; // reset the line start position ++ else startIndex = buffer.readerIndex(); + } + readable = buffer.readableBytes(); + } + + int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); + if (endIndex < 0) { - buffer.readerIndex(buffer.writerIndex()); ++ //does not appeared terminating newline ++ buffer.readerIndex(buffer.writerIndex()); // set to end buffer ++ if(eof){ ++ readBytes = buffer.readerIndex() - startIndex; ++ newlineLength = 0; ++ break loop; ++ } + } else { + buffer.readerIndex(endIndex + 1); + readBytes = buffer.readerIndex() - startIndex; + if (processor.isPrevCharCR() && buffer.isReadable() + && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + newlineLength = 2; + } else { + newlineLength = 1; + } + break loop; + } + } + reads.set(readBytes); + return buffer.slice(startIndex, readBytes - newlineLength); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index 0000000,0000000..f2eebc6 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@@ -1,0 -1,0 +1,96 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++import io.netty.buffer.ByteBuf; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.datum.Datum; ++import org.apache.tajo.storage.FieldSerializerDeserializer; ++import org.apache.tajo.storage.Tuple; ++ ++import java.io.IOException; ++ ++public class CSVLineDeserializer extends TextLineDeserializer { ++ private FieldSplitProcessor processor; ++ private FieldSerializerDeserializer fieldSerDer; ++ private ByteBuf nullChars; ++ ++ public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { ++ super(schema, meta, targetColumnIndexes); ++ } ++ ++ @Override ++ public void init() { ++ this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta)); ++ ++ if (nullChars != null) { ++ nullChars.release(); ++ } ++ nullChars = TextLineSerDe.getNullChars(meta); ++ ++ fieldSerDer = new TextFieldSerializerDeserializer(); ++ } ++ ++ public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { ++ int[] projection = targetColumnIndexes; ++ if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { ++ return; ++ } ++ ++ final int rowLength = lineBuf.readableBytes(); ++ int start = 0, fieldLength = 0, end = 0; ++ ++ //Projection ++ int currentTarget = 0; ++ int currentIndex = 0; ++ ++ while (end != -1) { ++ end = lineBuf.forEachByte(start, rowLength - start, processor); ++ ++ if (end < 0) { ++ fieldLength = rowLength - start; ++ } else { ++ fieldLength = end - start; ++ } ++ ++ if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { ++ lineBuf.setIndex(start, start + fieldLength); ++ Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); ++ output.put(currentIndex, datum); ++ currentTarget++; ++ } ++ ++ if (projection.length == currentTarget) { ++ break; ++ } ++ ++ start = end + 1; ++ currentIndex++; ++ } ++ } ++ ++ @Override ++ public void release() { ++ if (nullChars != null) { ++ nullChars.release(); ++ nullChars = null; ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java index 0000000,0000000..2fe7f23 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@@ -1,0 -1,0 +1,41 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++import org.apache.commons.lang.StringEscapeUtils; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.storage.StorageConstants; ++ ++public class CSVLineSerDe extends TextLineSerDe { ++ @Override ++ public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { ++ return new CSVLineDeserializer(schema, meta, targetColumnIndexes); ++ } ++ ++ @Override ++ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { ++ return new CSVLineSerializer(schema, meta); ++ } ++ ++ public static char getFieldDelimiter(TableMeta meta) { ++ return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, ++ StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java index 0000000,0000000..48154eb new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@@ -1,0 -1,0 +1,70 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.datum.Datum; ++import org.apache.tajo.storage.FieldSerializerDeserializer; ++import org.apache.tajo.storage.Tuple; ++ ++import java.io.IOException; ++import java.io.OutputStream; ++ ++public class CSVLineSerializer extends TextLineSerializer { ++ private FieldSerializerDeserializer serde; ++ ++ private byte [] nullChars; ++ private char delimiter; ++ private int columnNum; ++ ++ public CSVLineSerializer(Schema schema, TableMeta meta) { ++ super(schema, meta); ++ } ++ ++ @Override ++ public void init() { ++ nullChars = TextLineSerDe.getNullCharsAsBytes(meta); ++ delimiter = CSVLineSerDe.getFieldDelimiter(meta); ++ columnNum = schema.size(); ++ ++ serde = new TextFieldSerializerDeserializer(); ++ } ++ ++ @Override ++ public int serialize(OutputStream out, Tuple input) throws IOException { ++ int writtenBytes = 0; ++ ++ for (int i = 0; i < columnNum; i++) { ++ Datum datum = input.get(i); ++ writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); ++ ++ if (columnNum - 1 > i) { ++ out.write((byte) delimiter); ++ writtenBytes += 1; ++ } ++ } ++ ++ return writtenBytes; ++ } ++ ++ @Override ++ public void release() { ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index a337509,0000000..7848198 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@@ -1,468 -1,0 +1,475 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; - import org.apache.commons.lang.StringEscapeUtils; - import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; - import org.apache.tajo.datum.Datum; - import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; ++import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; ++import org.apache.tajo.util.ReflectionUtil; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; ++import java.util.Map; ++import java.util.concurrent.ConcurrentHashMap; ++ ++import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM; ++import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM; + +public class DelimitedTextFile { + + public static final byte LF = '\n'; - public static int EOF = -1; + + private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); + ++ /** it caches line serde classes. */ ++ private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache = ++ new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>(); ++ ++ /** ++ * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given, ++ * it will use the specified serder class. ++ * ++ * @return TextLineSerder ++ */ ++ public static TextLineSerDe getLineSerde(TableMeta meta) { ++ TextLineSerDe lineSerder; ++ ++ String serDeClassName; ++ ++ // if there is no given serde class, it will use CSV line serder. ++ serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS); ++ ++ try { ++ Class<? extends TextLineSerDe> serdeClass; ++ ++ if (serdeClassCache.containsKey(serDeClassName)) { ++ serdeClass = serdeClassCache.get(serDeClassName); ++ } else { ++ serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName); ++ serdeClassCache.put(serDeClassName, serdeClass); ++ } ++ lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass); ++ } catch (Throwable e) { ++ throw new RuntimeException("TextLineSerde class cannot be initialized.", e); ++ } ++ ++ return lineSerder; ++ } ++ + public static class DelimitedTextFileAppender extends FileAppender { + private final TableMeta meta; + private final Schema schema; - private final int columnNum; + private final FileSystem fs; + private FSDataOutputStream fos; + private DataOutputStream outputStream; + private CompressionOutputStream deflateFilter; - private char delimiter; + private TableStatistics stats = null; + private Compressor compressor; + private CompressionCodecFactory codecFactory; + private CompressionCodec codec; + private Path compressedPath; + private byte[] nullChars; + private int BUFFER_SIZE = 128 * 1024; + private int bufferedBytes = 0; + private long pos = 0; + + private NonSyncByteArrayOutputStream os; - private FieldSerializerDeserializer serde; ++ private TextLineSerializer serializer; + + public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) + throws IOException { + super(conf, taskAttemptId, schema, meta, path); + this.fs = path.getFileSystem(conf); + this.meta = meta; + this.schema = schema; - this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER, - StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - this.columnNum = schema.size(); - - String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } ++ } ++ ++ public TextLineSerDe getLineSerde() { ++ return DelimitedTextFile.getLineSerde(meta); + } + + @Override + public void init() throws IOException { + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + + if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { + String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); + codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByClassName(codecName); + compressor = CodecPool.getCompressor(codec); + if (compressor != null) compressor.reset(); //builtin gzip is null + + String extension = codec.getDefaultExtension(); + compressedPath = path.suffix(extension); + + if (fs.exists(compressedPath)) { + throw new AlreadyExistsStorageException(compressedPath); + } + + fos = fs.create(compressedPath); + deflateFilter = codec.createOutputStream(fos, compressor); + outputStream = new DataOutputStream(deflateFilter); + + } else { + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + fos = fs.create(path); + outputStream = new DataOutputStream(new BufferedOutputStream(fos)); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + - serde = new TextFieldSerializerDeserializer(); ++ serializer = getLineSerde().createSerializer(schema, meta); ++ serializer.init(); + + if (os == null) { + os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + } + + os.reset(); + pos = fos.getPos(); + bufferedBytes = 0; + super.init(); + } + - + @Override + public void addTuple(Tuple tuple) throws IOException { - Datum datum; - int rowBytes = 0; - - for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars); ++ // write ++ int rowBytes = serializer.serialize(os, tuple); + - if (columnNum - 1 > i) { - os.write((byte) delimiter); - rowBytes += 1; - } - } ++ // new line + os.write(LF); + rowBytes += 1; + ++ // update positions + pos += rowBytes; + bufferedBytes += rowBytes; ++ ++ // refill buffer if necessary + if (bufferedBytes > BUFFER_SIZE) { + flushBuffer(); + } + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + private void flushBuffer() throws IOException { + if (os.getLength() > 0) { + os.writeTo(outputStream); + os.reset(); + bufferedBytes = 0; + } + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + + try { ++ serializer.release(); ++ + if(outputStream != null){ + flush(); + } + + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + + if (deflateFilter != null) { + deflateFilter.finish(); + deflateFilter.resetState(); + deflateFilter = null; + } + + os.close(); + } finally { + IOUtils.cleanup(LOG, fos); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + + public boolean isCompress() { + return compressor != null; + } + + public String getExtension() { + return codec != null ? codec.getDefaultExtension() : ""; + } + } + + public static class DelimitedTextFileScanner extends FileScanner { - + private boolean splittable = false; + private final long startOffset; - private final long endOffset; + ++ private final long endOffset; ++ /** The number of actual read records */ + private int recordCount = 0; + private int[] targetColumnIndexes; + - private ByteBuf nullChars; - private FieldSerializerDeserializer serde; + private DelimitedLineReader reader; - private FieldSplitProcessor processor; ++ private TextLineDeserializer deserializer; ++ ++ private int errorPrintOutMaxNum = 5; ++ /** Maximum number of permissible errors */ ++ private int errorTorrenceMaxNum; ++ /** How many errors have occurred? */ ++ private int errorNum; + + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, + final Fragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + reader = new DelimitedLineReader(conf, this.fragment); + if (!reader.isCompressed()) { + splittable = true; + } + + startOffset = this.fragment.getStartKey(); + endOffset = startOffset + fragment.getLength(); + - //Delimiter - String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0)); ++ errorTorrenceMaxNum = ++ Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM)); + } + ++ + @Override + public void init() throws IOException { - if (nullChars != null) { - nullChars.release(); - } - - String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - byte[] bytes; - if (StringUtils.isEmpty(nullCharacters)) { - bytes = NullDatum.get().asTextBytes(); - } else { - bytes = nullCharacters.getBytes(); - } - - nullChars = BufferPool.directBuffer(bytes.length, bytes.length); - nullChars.writeBytes(bytes); - + if (reader != null) { + reader.close(); + } + reader = new DelimitedLineReader(conf, fragment); + reader.init(); + recordCount = 0; + + if (targets == null) { + targets = schema.toArray(); + } + + targetColumnIndexes = new int[targets.length]; + for (int i = 0; i < targets.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + - serde = new TextFieldSerializerDeserializer(); - + super.init(); + Arrays.sort(targetColumnIndexes); + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); + } + + if (startOffset > 0) { + reader.readLine(); // skip first line; + } - } + - public ByteBuf readLine() throws IOException { - ByteBuf buf = reader.readLine(); - if (buf == null) { - return null; - } else { - recordCount++; - } ++ deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes); ++ deserializer.init(); ++ } + - return buf; ++ public TextLineSerDe getLineSerde() { ++ return DelimitedTextFile.getLineSerde(meta); + } + + @Override + public float getProgress() { + try { + if (!reader.isReadable()) { + return 1.0f; + } + long filePos = reader.getCompressedPosition(); + if (startOffset == filePos) { + return 0.0f; + } else { + long readBytes = filePos - startOffset; + long remainingBytes = Math.max(endOffset - filePos, 0); + return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } + + @Override + public Tuple next() throws IOException { ++ ++ if (!reader.isReadable()) { ++ return null; ++ } ++ ++ if (targets.length == 0) { ++ return EmptyTuple.get(); ++ } ++ ++ VTuple tuple = new VTuple(schema.size()); ++ + try { - if (!reader.isReadable()) return null; + - ByteBuf buf = readLine(); - if (buf == null) return null; ++ // this loop will continue until one tuple is build or EOS (end of stream). ++ do { + - if (targets.length == 0) { - return EmptyTuple.get(); - } ++ ByteBuf buf = reader.readLine(); ++ if (buf == null) { ++ return null; ++ } + - VTuple tuple = new VTuple(schema.size()); - fillTuple(schema, tuple, buf, targetColumnIndexes); - return tuple; - } catch (Throwable t) { - LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t); - throw new IOException(t); - } - } ++ try { + - private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException { - int[] projection = target; - if (lineBuf == null || target == null || target.length == 0) { - return; - } ++ deserializer.deserialize(buf, tuple); ++ // if a line is read normaly, it exists this loop. ++ break; + - final int rowLength = lineBuf.readableBytes(); - int start = 0, fieldLength = 0, end = 0; ++ } catch (TextLineParsingError tae) { + - //Projection - int currentTarget = 0; - int currentIndex = 0; ++ errorNum++; + - while (end != -1) { - end = lineBuf.forEachByte(start, rowLength - start, processor); ++ // suppress too many log prints, which probably cause performance degradation ++ if (errorNum < errorPrintOutMaxNum) { ++ LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae); ++ } + - if (end < 0) { - fieldLength = rowLength - start; - } else { - fieldLength = end - start; - } ++ // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0), ++ // it checks if the number of parsing error exceeds the max limit. ++ // Otherwise, it will ignore all parsing errors. ++ if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) { ++ throw tae; ++ } ++ continue; ++ } + - if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { - lineBuf.setIndex(start, start + fieldLength); - Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); - dst.put(currentIndex, datum); - currentTarget++; - } ++ } while (reader.isReadable()); // continue until EOS + - if (projection.length == currentTarget) { - break; - } ++ // recordCount means the number of actual read records. We increment the count here. ++ recordCount++; + - start = end + 1; - currentIndex++; ++ return tuple; ++ ++ } catch (Throwable t) { ++ LOG.error(t); ++ throw new IOException(t); + } + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + try { - if (nullChars != null) { - nullChars.release(); - nullChars = null; ++ if (deserializer != null) { ++ deserializer.release(); + } + + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + } + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); + } + } finally { + IOUtils.cleanup(LOG, reader); + reader = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return splittable; + } + + @Override + public TableStats getInputStats() { + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + } +}
