Repository: tajo Updated Branches: refs/heads/master 4b1b7799d -> d261234ff
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java new file mode 100644 index 0000000..6ab8574 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -0,0 +1,1227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class FileTablespace extends Tablespace { + private final Log LOG = LogFactory.getLog(FileTablespace.class); + + static final String OUTPUT_FILE_PREFIX="part-"; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(6); + return fmt; + } + }; + + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + return fmt; + } + }; + + protected FileSystem fs; + protected Path tableBaseDir; + protected boolean blocksMetadataEnabled; + private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); + + public FileTablespace(String storeType) { + super(storeType); + } + + @Override + protected void storageInit() throws IOException { + this.tableBaseDir = TajoConf.getWarehouseDir(conf); + this.fs = tableBaseDir.getFileSystem(conf); + this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + if (!this.blocksMetadataEnabled) + LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) + throws IOException { + FileSystem fs = path.getFileSystem(conf); + FileStatus status = fs.getFileStatus(path); + return getFileScanner(meta, schema, path, status); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) + throws IOException { + Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + return getScanner(meta, schema, fragment); + } + + public FileSystem getFileSystem() { + return this.fs; + } + + public void delete(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + fs.delete(tablePath, true); + } + + public boolean exists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + return fileSystem.exists(path); + } + + public Path getTablePath(String tableName) { + return new Path(tableBaseDir, tableName); + } + + private String partitionPath = ""; + private int currentDepth = 0; + + /** + * Set a specific partition path for partition-column only queries + * @param path The partition prefix path + */ + public void setPartitionPath(String path) { partitionPath = path; } + + /** + * Set a depth of partition path for partition-column only queries + * @param depth Depth of partitions + */ + public void setCurrentDepth(int depth) { currentDepth = depth; } + + @VisibleForTesting + public Appender getAppender(TableMeta meta, Schema schema, Path filePath) + throws IOException { + return getAppender(null, null, meta, schema, filePath); + } + + public FileFragment[] split(String tableName) throws IOException { + Path tablePath = new Path(tableBaseDir, tableName); + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, long fragmentSize) throws IOException { + Path tablePath = new Path(tableBaseDir, tableName); + return split(tableName, tablePath, fragmentSize); + } + + public FileFragment[] split(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, Path tablePath) throws IOException { + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + private FileFragment[] split(String tableName, Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, + Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public long calculateSize(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + long totalSize = 0; + + if (fs.exists(tablePath)) { + totalSize = fs.getContentSummary(tablePath).getLength(); + } + + return totalSize; + } + + ///////////////////////////////////////////////////////////////////////////// + // FileInputFormat Area + ///////////////////////////////////////////////////////////////////////////// + public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) { + if (taskAttemptId == null) { + // For testcase + return workDir; + } + // The final result of a task will be written in a file named part-ss-nnnnnnn, + // where ss is the stage id associated with this task, and nnnnnn is the task id. + Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, + OUTPUT_FILE_PREFIX + + OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + + OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" + + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); + LOG.info("Output File Path: " + outFilePath); + + return outFilePath; + } + + /** + * Proxy PathFilter that accepts a path only if all filters given in the + * constructor do. Used by the listPaths() to apply the built-in + * hiddenFileFilter together with a user provided one (if any). + */ + private static class MultiPathFilter implements PathFilter { + private List<PathFilter> filters; + + public MultiPathFilter(List<PathFilter> filters) { + this.filters = filters; + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } + + /** + * List input directories. + * Subclasses may override to, e.g., select only files matching a regular + * expression. + * + * @return array of FileStatus objects + * @throws IOException if zero items. + */ + protected List<FileStatus> listStatus(Path... dirs) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + + List<IOException> errors = new ArrayList<IOException>(); + + // creates a MultiPathFilter with the hiddenFileFilter and the + // user provided one (if any). + List<PathFilter> filters = new ArrayList<PathFilter>(); + filters.add(hiddenFileFilter); + + PathFilter inputFilter = new MultiPathFilter(filters); + + for (int i = 0; i < dirs.length; ++i) { + Path p = dirs[i]; + + FileSystem fs = p.getFileSystem(conf); + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + p)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + p + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + if (globStat.isDirectory()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), + inputFilter)) { + result.add(stat); + } + } else { + result.add(globStat); + } + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result; + } + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + * <p/> + * <code>FileInputFormat</code> implementations can override this and return + * <code>false</code> to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param path the file name to check + * @param status get the file length + * @return is this file isSplittable? + */ + protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { + Scanner scanner = getFileScanner(meta, schema, path, status); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + private static final double SPLIT_SLOP = 1.1; // 10% slop + + protected int getBlockIndex(BlockLocation[] blkLocations, + long offset) { + for (int i = 0; i < blkLocations.length; i++) { + // is the offset inside this block? + if ((blkLocations[i].getOffset() <= offset) && + (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1; + throw new IllegalArgumentException("Offset " + offset + + " is outside of file (0.." + + fileLength + ")"); + } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { + return new FileFragment(fragmentId, file, start, length); + } + + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, + String[] hosts) { + return new FileFragment(fragmentId, file, start, length, hosts); + } + + protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) + throws IOException { + return new FileFragment(fragmentId, file, blockLocation); + } + + // for Non Splittable. eg, compressed gzip TextFile + protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations) throws IOException { + + Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet()); + Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { + + @Override + public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { + return v1.getValue().compareTo(v2.getValue()); + } + }); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + return new FileFragment(fragmentId, file, start, length, hosts); + } + + /** + * Get the minimum split size + * + * @return the minimum number of bytes that can be in a split + */ + public long getMinSplitSize() { + return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); + } + + /** + * Get Disk Ids by Volume Bytes + */ + private int[] getDiskIds(VolumeId[] volumeIds) { + int[] diskIds = new int[volumeIds.length]; + for (int i = 0; i < volumeIds.length; i++) { + int diskId = -1; + if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { + diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); + } + diskIds[i] = diskId; + } + return diskIds; + } + + /** + * Generate the map of host and make them into Volume Ids. + * + */ + private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) { + Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>(); + for (FileFragment frag : frags) { + String[] hosts = frag.getHosts(); + int[] diskIds = frag.getDiskIds(); + for (int i = 0; i < hosts.length; i++) { + Set<Integer> volumeList = volumeMap.get(hosts[i]); + if (volumeList == null) { + volumeList = new HashSet<Integer>(); + volumeMap.put(hosts[i], volumeList); + } + + if (diskIds.length > 0 && diskIds[i] > -1) { + volumeList.add(diskIds[i]); + } + } + } + + return volumeMap; + } + /** + * Generate the list of files and make them into FileSplits. + * + * @throws IOException + */ + public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) + throws IOException { + // generate splits' + + List<Fragment> splits = Lists.newArrayList(); + List<Fragment> volumeSplits = Lists.newArrayList(); + List<BlockLocation> blockLocations = Lists.newArrayList(); + + for (Path p : inputs) { + FileSystem fs = p.getFileSystem(conf); + + ArrayList<FileStatus> files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + int previousSplitSize = splits.size(); + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittable(meta, schema, path, file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + + } else { + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts())); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts())); + } + } else { // Non splittable + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + } else { + //for zero length files + splits.add(makeSplit(tableName, path, 0, length)); + } + } + if(LOG.isDebugEnabled()){ + LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); + } + } + + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); + LOG.info("Total # of splits: " + splits.size()); + return splits; + } + + private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) + throws IOException { + + int locationSize = blockLocations.size(); + int splitSize = splits.size(); + if (locationSize == 0 || splitSize == 0) return; + + if (locationSize != splitSize) { + // splits and locations don't match up + LOG.warn("Number of block locations not equal to number of splits: " + + "#locations=" + locationSize + + " #splits=" + splitSize); + return; + } + + DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); + int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); + int blockLocationIdx = 0; + + Iterator<Fragment> iter = splits.iterator(); + while (locationSize > blockLocationIdx) { + + int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); + List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); + //BlockStorageLocation containing additional volume location information for each replica of each block. + BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); + + for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { + ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); + blockLocationIdx++; + } + } + LOG.info("# of splits with volumeId " + splitSize); + } + + private static class InvalidInputException extends IOException { + List<IOException> errors; + public InvalidInputException(List<IOException> errors) { + this.errors = errors; + } + + @Override + public String getMessage(){ + StringBuffer sb = new StringBuffer(); + int messageLimit = Math.min(errors.size(), 10); + for (int i = 0; i < messageLimit ; i ++) { + sb.append(errors.get(i).getMessage()).append("\n"); + } + + if(messageLimit < errors.size()) + sb.append("skipped .....").append("\n"); + + return sb.toString(); + } + } + + @Override + public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException { + return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath())); + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + if (!tableDesc.isExternal()) { + String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String simpleTableName = splitted[1]; + + // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) + Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName); + tableDesc.setPath(tablePath.toUri()); + } else { + Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given."); + } + + Path path = new Path(tableDesc.getPath()); + + FileSystem fs = path.getFileSystem(conf); + TableStats stats = new TableStats(); + if (tableDesc.isExternal()) { + if (!fs.exists(path)) { + LOG.error(path.toUri() + " does not exist"); + throw new IOException("ERROR: " + path.toUri() + " does not exist"); + } + } else { + fs.mkdirs(path); + } + + long totalSize = 0; + + try { + totalSize = calculateSize(path); + } catch (IOException e) { + LOG.warn("Cannot calculate the size of the relation", e); + } + + stats.setNumBytes(totalSize); + + if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + } + + tableDesc.setStats(stats); + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + try { + Path path = new Path(tableDesc.getPath()); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Delete table data dir: " + path); + fs.delete(path, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + } + + @Override + public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException { + // Listing table data file which is not empty. + // If the table is a partitioned table, return file list which has same partition key. + Path tablePath = new Path(tableDesc.getPath()); + FileSystem fs = tablePath.getFileSystem(conf); + + //In the case of partitioned table, we should return same partition key data files. + int partitionDepth = 0; + if (tableDesc.hasPartition()) { + partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns().size(); + } + + List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); + if (fs.exists(tablePath)) { + if (!partitionPath.isEmpty()) { + Path partPath = new Path(tableDesc.getPath() + partitionPath); + if (fs.exists(partPath)) { + getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); + } + } else { + getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + } + } + + List<Fragment> fragments = new ArrayList<Fragment>(); + + String[] previousPartitionPathNames = null; + for (FileStatus eachFile: nonZeroLengthFiles) { + FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); + + if (partitionDepth > 0) { + // finding partition key; + Path filePath = fileFragment.getPath(); + Path parentPath = filePath; + String[] parentPathNames = new String[partitionDepth]; + for (int i = 0; i < partitionDepth; i++) { + parentPath = parentPath.getParent(); + parentPathNames[partitionDepth - i - 1] = parentPath.getName(); + } + + // If current partitionKey == previousPartitionKey, add to result. + if (previousPartitionPathNames == null) { + fragments.add(fileFragment); + } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { + fragments.add(fileFragment); + } else { + break; + } + previousPartitionPathNames = parentPathNames; + } else { + fragments.add(fileFragment); + } + } + + return fragments; + } + + /** + * + * @param fs + * @param path The table path + * @param result The final result files to be used + * @param startFileIndex + * @param numResultFiles + * @param currentFileIndex + * @param partitioned A flag to indicate if this table is partitioned + * @param currentDepth Current visiting depth of partition directories + * @param maxDepth The partition depth of this table + * @throws IOException + */ + private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result, + int startFileIndex, int numResultFiles, + AtomicInteger currentFileIndex, boolean partitioned, + int currentDepth, int maxDepth) throws IOException { + // Intermediate directory + if (fs.isDirectory(path)) { + + FileStatus[] files = fs.listStatus(path, Tablespace.hiddenFileFilter); + + if (files != null && files.length > 0) { + + for (FileStatus eachFile : files) { + + // checking if the enough number of files are found + if (result.size() >= numResultFiles) { + return; + } + if (eachFile.isDirectory()) { + + getNonZeroLengthDataFiles( + fs, + eachFile.getPath(), + result, + startFileIndex, + numResultFiles, + currentFileIndex, + partitioned, + currentDepth + 1, // increment a visiting depth + maxDepth); + + // if partitioned table, we should ignore files located in the intermediate directory. + // we can ensure that this file is in leaf directory if currentDepth == maxDepth. + } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(eachFile); + } + currentFileIndex.incrementAndGet(); + } + } + } + + // Files located in leaf directory + } else { + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus != null && fileStatus.getLen() > 0) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(fileStatus); + } + currentFileIndex.incrementAndGet(); + if (result.size() >= numResultFiles) { + return; + } + } + } + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(false); + if (storeType.equalsIgnoreCase("RAW")) { + storageProperty.setSupportsInsertInto(false); + } else { + storageProperty.setSupportsInsertInto(true); + } + + return storageProperty; + } + + @Override + public void close() { + } + + @Override + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + } + + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + } + + @Override + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + } + + @Override + public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) + throws IOException { + return null; + } + + @Override + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, + Schema schema, TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, true); + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + return null; + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws java.io.IOException + */ + protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); + try { + FileSystem fs = stagingResultDir.getFileSystem(conf); + + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + ContentSummary summary = fs.getContentSummary(stagingResultDir); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + + throw new IOException(ioe.getMessage()); + } + } else { // no partition + try { + + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); + + for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } + + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } + + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + + // Check the final output dir + committed = fs.exists(finalOutputDir); + + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } + + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } + + throw new IOException(ioe.getMessage()); + } + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + + if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } else { + fs.rename(stagingResultDir, finalOutputDir); + } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } + } + + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + + return finalOutputDir; + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws java.io.IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws java.io.IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws java.io.IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws java.io.IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 1846ed6..66c7f13 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -82,7 +82,7 @@ public class HashShuffleAppenderManager { if (!fs.exists(dataFile.getParent())) { fs.mkdirs(dataFile.getParent()); } - FileAppender appender = (FileAppender)((FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf)) + FileAppender appender = (FileAppender)((FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf)) .getAppender(meta, outSchema, dataFile); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 779f908..75ad0d5 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -122,7 +122,7 @@ public class TestCompressionStorages { String fileName = "Compression_" + codec.getSimpleName(); Path tablePath = new Path(testDir, fileName); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java index 41c6c67..76b5c2f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java @@ -80,7 +80,7 @@ public class TestFileStorageManager { Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); fs.mkdirs(path.getParent()); - FileStorageManager fileStorageManager = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri()); Appender appender = fileStorageManager.getAppender(meta, schema, path); @@ -127,7 +127,7 @@ public class TestFileStorageManager { } assertTrue(fs.exists(tablePath)); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf); assertEquals(fs.getUri(), sm.getFileSystem().getUri()); Schema schema = new Schema(); @@ -181,7 +181,7 @@ public class TestFileStorageManager { DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); } assertTrue(fs.exists(tablePath)); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf); assertEquals(fs.getUri(), sm.getFileSystem().getUri()); Schema schema = new Schema(); @@ -220,11 +220,11 @@ public class TestFileStorageManager { try { /* Local FileSystem */ - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getStorageManager(conf, "CSV"); + FileTablespace sm = (FileTablespace) TableSpaceManager.getStorageManager(conf, "CSV"); assertEquals(fs.getUri(), sm.getFileSystem().getUri()); /* Distributed FileSystem */ - sm = (FileStorageManager) TableSpaceManager.getStorageManager(tajoConf, "CSV"); + sm = (FileTablespace) TableSpaceManager.getStorageManager(tajoConf, "CSV"); assertNotEquals(fs.getUri(), sm.getFileSystem().getUri()); assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri()); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index 1222fae..235bfaa 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -50,14 +50,14 @@ public class TestFileSystems { private static String TEST_PATH = "target/test-data/TestFileSystem"; private TajoConf conf; - private FileStorageManager sm; + private FileTablespace sm; private FileSystem fs; private Path testDir; public TestFileSystems(FileSystem fs) throws IOException { this.fs = fs; this.conf = new TajoConf(fs.getConf()); - sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); testDir = getTestDir(this.fs, TEST_PATH); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 82acaf3..04f8a90 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -49,7 +49,7 @@ import static org.junit.Assert.*; @RunWith(Parameterized.class) public class TestMergeScanner { private TajoConf conf; - StorageManager sm; + Tablespace sm; private static String TEST_PATH = "target/test-data/TestMergeScanner"; private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA = http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 286902a..4ca61e4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -155,7 +155,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Splitable.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -210,7 +210,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Splitable.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -271,7 +271,7 @@ public class TestStorages { } Path tablePath = new Path(testDir, "testProjection.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); int tupleNum = 10000; @@ -347,7 +347,7 @@ public class TestStorages { meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); } - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Path tablePath = new Path(testDir, "testVariousTypes.data"); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); @@ -425,7 +425,7 @@ public class TestStorages { } Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); @@ -513,7 +513,7 @@ public class TestStorages { meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -583,7 +583,7 @@ public class TestStorages { meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -653,7 +653,7 @@ public class TestStorages { meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -727,7 +727,7 @@ public class TestStorages { meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -789,7 +789,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType, options); Path tablePath = new Path(testDir, "testTime.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); @@ -831,7 +831,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Seekable.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -921,7 +921,7 @@ public class TestStorages { conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize); } - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Path tablePath = new Path(testDir, "testMaxValue.data"); Appender appender = sm.getAppender(meta, schema, tablePath); @@ -977,7 +977,7 @@ public class TestStorages { meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); Path tablePath = new Path(testDir, "testLessThanSchemaSize.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, dataSchema, tablePath); appender.init(); @@ -1041,7 +1041,7 @@ public class TestStorages { meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); Path tablePath = new Path(testDir, "test_storetype_oversize.data"); - FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); Appender appender = sm.getAppender(meta, dataSchema, tablePath); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index ae0fd58..e15c474 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -89,7 +89,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindValue_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i++) { @@ -177,7 +177,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType); - FileAppender appender = (FileAppender) ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)) + FileAppender appender = (FileAppender) ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) .getAppender(meta, schema, tablePath); appender.init(); @@ -256,7 +256,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i += 2) { @@ -326,7 +326,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i++) { @@ -416,7 +416,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)) + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) .getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -496,7 +496,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindMinValue" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -579,7 +579,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testMinMax_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 5; i < TUPLE_NUM; i += 2) { @@ -683,7 +683,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -763,7 +763,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -854,7 +854,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index cebeeb2..09191f4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex { Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i++) { @@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex { Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", "table1.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i ++ ) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index da49d1d..6a9e7ce 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -28,11 +28,11 @@ <!-- Storage Manager Configuration --> <property> <name>tajo.storage.manager.hdfs.class</name> - <value>org.apache.tajo.storage.FileStorageManager</value> + <value>org.apache.tajo.storage.FileTablespace</value> </property> <property> <name>tajo.storage.manager.hbase.class</name> - <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + <value>org.apache.tajo.storage.hbase.HBaseTablespace</value> </property> <!--- Registered Scanner Handler -->
