Repository: tajo Updated Branches: refs/heads/master 8d513a5e8 -> 8c8048a44
TAJO-2004: S3TableSpace needs to extend FileTableSpace. Closes #898 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8c8048a4 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8c8048a4 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8c8048a4 Branch: refs/heads/master Commit: 8c8048a44cb63073a1a5fda1a653e239558b96cb Parents: 8d513a5 Author: JaeHwa Jung <[email protected]> Authored: Thu Dec 3 15:12:26 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu Dec 3 15:12:26 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/storage/s3/S3TableSpace.java | 1107 +----------------- 2 files changed, 3 insertions(+), 1106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8c8048a4/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9903944..a1ba896 100644 --- a/CHANGES +++ b/CHANGES @@ -137,6 +137,8 @@ Release 0.12.0 - unreleased SUB TASKS + TAJO-2004: S3TableSpace needs to extend FileTableSpace. (jaehwa) + TAJO-1960: Separate HDFS space handler and S3 space handler. (jaehwa) TAJO-1856: Add a description about the relationship of tablespace, managed table, http://git-wip-us.apache.org/repos/asf/tajo/blob/8c8048a4/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java index 7ac5425..69eab9a 100644 --- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -52,1113 +52,8 @@ import java.net.URI; import java.text.NumberFormat; import java.util.*; -public class S3TableSpace extends Tablespace { - - public static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; - private final Log LOG = LogFactory.getLog(S3TableSpace.class); - - static final String OUTPUT_FILE_PREFIX="part-"; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(2); - return fmt; - } - }; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(6); - return fmt; - } - }; - - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - return fmt; - } - }; - - private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true, false); - private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true); - - protected FileSystem fs; - protected Path spacePath; - protected Path stagingRootPath; - protected boolean blocksMetadataEnabled; - private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); - +public class S3TableSpace extends FileTablespace { public S3TableSpace(String spaceName, URI uri, JSONObject config) { super(spaceName, uri, config); } - - @Override - protected void storageInit() throws IOException { - this.spacePath = new Path(uri); - this.fs = spacePath.getFileSystem(conf); - this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR))); - this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString()); - - this.blocksMetadataEnabled = - conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - - if (!this.blocksMetadataEnabled) { - LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); - } - } - - @Override - public long getTableVolume(URI uri) throws UnsupportedException { - Path path = new Path(uri); - ContentSummary summary; - try { - summary = fs.getContentSummary(path); - } catch (IOException e) { - throw new TajoInternalError(e); - } - return summary.getLength(); - } - - @Override - public URI getRootUri() { - return fs.getUri(); - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) - throws IOException { - FileStatus status = fs.getFileStatus(path); - return getFileScanner(meta, schema, path, status); - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) - throws IOException { - Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getScanner(meta, schema, fragment, null); - } - - public FileSystem getFileSystem() { - return this.fs; - } - - public void delete(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - fs.delete(tablePath, true); - } - - public boolean exists(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - return fileSystem.exists(path); - } - - @Override - public URI getTableUri(String databaseName, String tableName) { - return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); - } - - @VisibleForTesting - public Appender getAppender(TableMeta meta, Schema schema, Path filePath) - throws IOException { - return getAppender(null, null, meta, schema, filePath); - } - - public FileFragment[] split(String tableName) throws IOException { - Path tablePath = new Path(spacePath, tableName); - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, long fragmentSize) throws IOException { - Path tablePath = new Path(spacePath, tableName); - return split(tableName, tablePath, fragmentSize); - } - - public FileFragment[] split(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, Path tablePath) throws IOException { - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - private FileFragment[] split(String tableName, Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List<FileFragment> listTablets = new ArrayList<>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, - Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List<FileFragment> listTablets = new ArrayList<>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public long calculateSize(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - long totalSize = 0; - - if (fs.exists(tablePath)) { - totalSize = fs.getContentSummary(tablePath).getLength(); - } - - return totalSize; - } - - ///////////////////////////////////////////////////////////////////////////// - // FileInputFormat Area - ///////////////////////////////////////////////////////////////////////////// - public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) { - if (taskAttemptId == null) { - // For testcase - return workDir; - } - // The final result of a task will be written in a file named part-ss-nnnnnnn, - // where ss is the stage id associated with this task, and nnnnnn is the task id. - Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, - OUTPUT_FILE_PREFIX + - OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + - OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" + - OUTPUT_FILE_FORMAT_SEQ.get().format(0)); - LOG.info("Output File Path: " + outFilePath); - - return outFilePath; - } - - /** - * Proxy PathFilter that accepts a path only if all filters given in the - * constructor do. Used by the listPaths() to apply the built-in - * hiddenFileFilter together with a user provided one (if any). - */ - private static class MultiPathFilter implements PathFilter { - private List<PathFilter> filters; - - public MultiPathFilter(List<PathFilter> filters) { - this.filters = filters; - } - - public boolean accept(Path path) { - for (PathFilter filter : filters) { - if (!filter.accept(path)) { - return false; - } - } - return true; - } - } - - /** - * List input directories. - * Subclasses may override to, e.g., select only files matching a regular - * expression. - * - * @return array of FileStatus objects - * @throws IOException if zero items. - */ - protected List<FileStatus> listStatus(Path... dirs) throws IOException { - List<FileStatus> result = new ArrayList<>(); - if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); - } - - List<IOException> errors = new ArrayList<>(); - - // creates a MultiPathFilter with the hiddenFileFilter and the - // user provided one (if any). - List<PathFilter> filters = new ArrayList<>(); - filters.add(hiddenFileFilter); - - PathFilter inputFilter = new MultiPathFilter(filters); - - for (int i = 0; i < dirs.length; ++i) { - Path p = dirs[i]; - - FileStatus[] matches = fs.globStatus(p, inputFilter); - if (matches == null) { - LOG.warn("Input path does not exist: " + p); - } else if (matches.length == 0) { - LOG.warn("Input Pattern " + p + " matches 0 files"); - } else { - for (FileStatus globStat : matches) { - if (globStat.isDirectory()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { - result.add(stat); - } - } else { - result.add(globStat); - } - } - } - } - - if (!errors.isEmpty()) { - throw new InvalidInputException(errors); - } - LOG.info("Total input paths to process : " + result.size()); - return result; - } - - /** - * Is the given filename splitable? Usually, true, but if the file is - * stream compressed, it will not be. - * <p/> - * <code>FileInputFormat</code> implementations can override this and return - * <code>false</code> to ensure that individual input files are never split-up - * so that Mappers process entire files. - * - * - * @param path the file name to check - * @param status get the file length - * @return is this file isSplittable? - */ - protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { - Scanner scanner = getFileScanner(meta, schema, path, status); - boolean split = scanner.isSplittable(); - scanner.close(); - return split; - } - - private static final double SPLIT_SLOP = 1.1; // 10% slop - - protected int getBlockIndex(BlockLocation[] blkLocations, - long offset) { - for (int i = 0; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) && - (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1; - throw new IllegalArgumentException("Offset " + offset + - " is outside of file (0.." + - fileLength + ")"); - } - - /** - * A factory that makes the split for this class. It can be overridden - * by sub-classes to make sub-types - */ - protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { - return new FileFragment(fragmentId, file, start, length); - } - - protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, - String[] hosts) { - return new FileFragment(fragmentId, file, start, length, hosts); - } - - protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) - throws IOException { - return new FileFragment(fragmentId, file, blockLocation); - } - - // for Non Splittable. eg, compressed gzip TextFile - protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, - BlockLocation[] blkLocations) throws IOException { - - Map<String, Integer> hostsBlockMap = new HashMap<>(); - for (BlockLocation blockLocation : blkLocations) { - for (String host : blockLocation.getHosts()) { - if (hostsBlockMap.containsKey(host)) { - hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); - } else { - hostsBlockMap.put(host, 1); - } - } - } - - List<Map.Entry<String, Integer>> entries = new ArrayList<>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { - - @Override - public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); - - String[] hosts = new String[blkLocations[0].getHosts().length]; - - for (int i = 0; i < hosts.length; i++) { - Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); - hosts[i] = entry.getKey(); - } - return new FileFragment(fragmentId, file, start, length, hosts); - } - - /** - * Get the minimum split size - * - * @return the minimum number of bytes that can be in a split - */ - public long getMinSplitSize() { - return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); - } - - /** - * Get Disk Ids by Volume Bytes - */ - private int[] getDiskIds(VolumeId[] volumeIds) { - int[] diskIds = new int[volumeIds.length]; - for (int i = 0; i < volumeIds.length; i++) { - int diskId = -1; - if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { - diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); - } - diskIds[i] = diskId; - } - return diskIds; - } - - /** - * Generate the list of files and make them into FileSplits. - * - * @throws IOException - */ - public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) - throws IOException { - // generate splits' - - List<Fragment> splits = Lists.newArrayList(); - List<Fragment> volumeSplits = Lists.newArrayList(); - List<BlockLocation> blockLocations = Lists.newArrayList(); - - for (Path p : inputs) { - ArrayList<FileStatus> files = Lists.newArrayList(); - if (fs.isFile(p)) { - files.addAll(Lists.newArrayList(fs.getFileStatus(p))); - } else { - files.addAll(listStatus(p)); - } - - int previousSplitSize = splits.size(); - for (FileStatus file : files) { - Path path = file.getPath(); - long length = file.getLen(); - if (length > 0) { - // Get locations of blocks of file - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, schema, path, file); - if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { - - if (splittable) { - for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); - } - blockLocations.addAll(Arrays.asList(blkLocations)); - - } else { // Non splittable - long blockSize = blkLocations[0].getLength(); - if (blockSize >= length) { - blockLocations.addAll(Arrays.asList(blkLocations)); - for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); - } - } else { - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); - } - } - - } else { - if (splittable) { - - long minSize = Math.max(getMinSplitSize(), 1); - - long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one - long splitSize = Math.max(minSize, blockSize); - long bytesRemaining = length; - - // for s3 - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); - bytesRemaining -= splitSize; - } - if (bytesRemaining > 0) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); - } - } else { // Non splittable - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); - } - } - } - } - if(LOG.isDebugEnabled()){ - LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); - } - } - - // Combine original fileFragments with new VolumeId information - setVolumeMeta(volumeSplits, blockLocations); - splits.addAll(volumeSplits); - LOG.info("Total # of splits: " + splits.size()); - return splits; - } - - private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) - throws IOException { - - int locationSize = blockLocations.size(); - int splitSize = splits.size(); - if (locationSize == 0 || splitSize == 0) return; - - if (locationSize != splitSize) { - // splits and locations don't match up - LOG.warn("Number of block locations not equal to number of splits: " - + "#locations=" + locationSize - + " #splits=" + splitSize); - return; - } - - DistributedFileSystem fs = (DistributedFileSystem) this.fs; - int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); - int blockLocationIdx = 0; - - Iterator<Fragment> iter = splits.iterator(); - while (locationSize > blockLocationIdx) { - - int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); - List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); - //BlockStorageLocation containing additional volume location information for each replica of each block. - BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); - - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); - blockLocationIdx++; - } - } - LOG.info("# of splits with volumeId " + splitSize); - } - - private static class InvalidInputException extends IOException { - List<IOException> errors; - public InvalidInputException(List<IOException> errors) { - this.errors = errors; - } - - @Override - public String getMessage(){ - StringBuffer sb = new StringBuffer(); - int messageLimit = Math.min(errors.size(), 10); - for (int i = 0; i < messageLimit ; i ++) { - sb.append(errors.get(i).getMessage()).append("\n"); - } - - if(messageLimit < errors.size()) - sb.append("skipped .....").append("\n"); - - return sb.toString(); - } - } - - @Override - public List<Fragment> getSplits(String inputSourceId, - TableDesc table, - @Nullable EvalNode filterCondition) throws IOException { - return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri())); - } - - @Override - public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { - if (!tableDesc.isExternal()) { - String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); - String databaseName = splitted[0]; - String simpleTableName = splitted[1]; - - // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) - Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName); - tableDesc.setUri(tablePath.toUri()); - } else { - Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given."); - } - - Path path = new Path(tableDesc.getUri()); - - FileSystem fs = path.getFileSystem(conf); - TableStats stats = new TableStats(); - if (tableDesc.isExternal()) { - if (!fs.exists(path)) { - LOG.error(path.toUri() + " does not exist"); - throw new IOException("ERROR: " + path.toUri() + " does not exist"); - } - } else { - fs.mkdirs(path); - } - - long totalSize = 0; - - try { - totalSize = calculateSize(path); - } catch (IOException e) { - LOG.warn("Cannot calculate the size of the relation", e); - } - - stats.setNumBytes(totalSize); - - if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. - stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); - } - - tableDesc.setStats(stats); - } - - @Override - public void purgeTable(TableDesc tableDesc) throws IOException { - try { - Path path = new Path(tableDesc.getUri()); - FileSystem fs = path.getFileSystem(conf); - LOG.info("Delete table data dir: " + path); - fs.delete(path, true); - } catch (IOException e) { - throw new InternalError(e.getMessage()); - } - } - - @Override - public StorageProperty getProperty() { - return FileStorageProperties; - } - - @Override - public FormatProperty getFormatProperty(TableMeta meta) { - return GeneralFileProperties; - } - - @Override - public void close() { - } - - @Override - public void prepareTable(LogicalNode node) throws IOException { - } - - @Override - public void rollbackTable(LogicalNode node) throws IOException { - } - - @Override - public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { - String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, ""); - - Path stagingDir; - // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO - // So, this query results won't be materialized as a part of a table. - // The result will be temporarily written in the staging directory. - if (outputPath.isEmpty()) { - // for temporarily written in the storage directory - stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); - } else { - Tablespace space = TablespaceManager.get(outputPath); - if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation - // If this space allows move operation, the staging directory will be underneath the final output table uri. - stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId)); - } else { - stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); - } - } - - return stagingDir.toUri(); - } - - // query submission directory is private! - final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx-------- - public static final String TMP_STAGING_DIR_PREFIX = ".staging"; - - public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta) - throws IOException { - - String realUser; - String currentUser; - UserGroupInformation ugi; - ugi = UserGroupInformation.getLoginUser(); - realUser = ugi.getShortUserName(); - currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); - - - Path stagingDir = new Path(getStagingUri(context, queryId, meta)); - - //////////////////////////////////////////// - // Create Output Directory - //////////////////////////////////////////// - - if (fs.exists(stagingDir)) { - throw new IOException("The staging directory '" + stagingDir + "' already exists"); - } - fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - FileStatus fsStatus = fs.getFileStatus(stagingDir); - String owner = fsStatus.getOwner(); - - if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { - throw new IOException("The ownership on the user's query " + - "directory " + stagingDir + " is not as expected. " + - "It is owned by " + owner + ". The directory must " + - "be owned by the submitter " + currentUser + " or " + - "by " + realUser); - } - - if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { - LOG.info("Permissions on staging directory " + stagingDir + " are " + - "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + - "to correct value " + STAGING_DIR_PERMISSION); - fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - } - - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - fs.mkdirs(stagingResultDir); - - return stagingDir.toUri(); - } - - @Override - public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) { - } - - @Override - public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, - Schema schema, TableDesc tableDesc) throws IOException { - return commitOutputData(queryContext, true); - } - - @Override - public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, - Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) - throws IOException { - return null; - } - - /** - * Finalizes result data. Tajo stores result data in the staging directory. - * If the query fails, clean up the staging directory. - * Otherwise the query is successful, move to the final directory from the staging directory. - * - * @param queryContext The query property - * @param changeFileSeq If true change result file name with max sequence. - * @return Saved path - * @throws java.io.IOException - */ - protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { - Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; - if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { - finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); - try { - FileSystem fs = stagingResultDir.getFileSystem(conf); - - if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - ContentSummary summary = fs.getContentSummary(stagingResultDir); - - // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not. - boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); - - // If existing data doesn't need to keep, check if there are some files. - if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) - && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = new HashMap<>(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = new HashMap<>(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } - - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } - - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { - - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); - - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } - - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } - - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - - // Check the final output dir - committed = fs.exists(finalOutputDir); - - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } - - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } - - throw new IOException(ioe.getMessage()); - } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); - - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } - } - - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = stagingDir.getParent(); - fs.delete(stagingDirRoot, true); - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); - } - } else { - finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - } - - return finalOutputDir; - } - - /** - * Attach the sequence number to the output file name and than move the file into the final result path. - * - * @param fs FileSystem - * @param stagingResultDir The staging result dir - * @param fileStatus The file status - * @param finalOutputPath Final output path - * @param nf Number format - * @param fileSeq The sequence number - * @throws java.io.IOException - */ - private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq, boolean changeFileSeq) throws IOException { - if (fileStatus.isDirectory()) { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (!fs.exists(finalSubPath)) { - fs.mkdirs(finalSubPath); - } - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); - for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); - } - } else { - throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); - } - } else { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (changeFileSeq) { - finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); - } - if (!fs.exists(finalSubPath.getParent())) { - fs.mkdirs(finalSubPath.getParent()); - } - if (fs.exists(finalSubPath)) { - throw new IOException("Already exists data file:" + finalSubPath); - } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); - if (success) { - LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } else { - LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } - } - } - } - - /** - * Removes the path of the parent. - * @param parentPath - * @param childPath - * @return - */ - private String extractSubPath(Path parentPath, Path childPath) { - String parentPathStr = parentPath.toUri().getPath(); - String childPathStr = childPath.toUri().getPath(); - - if (parentPathStr.length() > childPathStr.length()) { - return null; - } - - int index = childPathStr.indexOf(parentPathStr); - if (index != 0) { - return null; - } - - return childPathStr.substring(parentPathStr.length() + 1); - } - - /** - * Attach the sequence number to a path. - * - * @param path Path - * @param seq sequence number - * @param nf Number format - * @return New path attached with sequence number - * @throws java.io.IOException - */ - private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { - String[] tokens = path.getName().split("-"); - if (tokens.length != 4) { - throw new IOException("Wrong result file name:" + path); - } - return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); - } - - /** - * Make sure all files are moved. - * @param fs FileSystem - * @param stagingPath The stagind directory - * @return - * @throws java.io.IOException - */ - private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - if (eachFile.isFile()) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - return false; - } else { - if (verifyAllFileMoved(fs, eachFile.getPath())) { - fs.delete(eachFile.getPath(), false); - } else { - return false; - } - } - } - } - - return true; - } - - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws java.io.IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } - } - } - } - - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; - - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; - } - } - - return retValue; - } - }
