Repository: tajo Updated Branches: refs/heads/master f2552bfb9 -> 3852ae3d0
TAJO-1960: Separate HDFS space handler and S3 space handler. Closes #868 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3852ae3d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3852ae3d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3852ae3d Branch: refs/heads/master Commit: 3852ae3d0d3fcb30ebff73f087ce34f3e1fc5bfb Parents: f2552bf Author: JaeHwa Jung <[email protected]> Authored: Thu Dec 3 12:29:09 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu Dec 3 12:29:09 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + tajo-storage/pom.xml | 2 + .../src/main/resources/storage-default.json | 8 +- tajo-storage/tajo-storage-s3/pom.xml | 233 ++++ .../apache/tajo/storage/s3/S3TableSpace.java | 1164 ++++++++++++++++++ .../tajo/storage/s3/MockS3FileSystem.java | 110 ++ .../tajo/storage/s3/TestS3TableSpace.java | 62 + 7 files changed, 1577 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e0fd693..688c2f5 100644 --- a/CHANGES +++ b/CHANGES @@ -135,6 +135,8 @@ Release 0.12.0 - unreleased SUB TASKS + TAJO-1960: Separate HDFS space handler and S3 space handler. (jaehwa) + TAJO-1856: Add a description about the relationship of tablespace, managed table, and external table to Tablespace section of Table Management chapter. (Contributed by Jongyoung Park. Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index a6b9b8a..4881e2c 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -39,6 +39,7 @@ <module>tajo-storage-hbase</module> <module>tajo-storage-jdbc</module> <module>tajo-storage-pgsql</module> + <module>tajo-storage-s3</module> </modules> <build> @@ -135,6 +136,7 @@ run cp -r ${basedir}/tajo-storage-common/target/tajo-storage-common-${project.version}*.jar . run cp -r ${basedir}/tajo-storage-hdfs/target/tajo-storage-hdfs-${project.version}*.jar . run cp -r ${basedir}/tajo-storage-hbase/target/tajo-storage-hbase-${project.version}*.jar . + run cp -r ${basedir}/tajo-storage-s3/target/tajo-storage-s3-${project.version}*.jar . echo echo "Tajo Storage dist layout available at: ${project.build.directory}/tajo-storage-${project.version}" http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json index a24f301..17ac3ba 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json @@ -17,19 +17,19 @@ "default-format": "rowstore" }, "s3": { - "handler": "org.apache.tajo.storage.FileTablespace", + "handler": "org.apache.tajo.storage.s3.S3TableSpace", "default-format": "text" }, "s3a": { - "handler": "org.apache.tajo.storage.FileTablespace", + "handler": "org.apache.tajo.storage.s3.S3TableSpace", "default-format": "text" }, "s3n": { - "handler": "org.apache.tajo.storage.FileTablespace", + "handler": "org.apache.tajo.storage.s3.S3TableSpace", "default-format": "text" }, "swift": { - "handler": "org.apache.tajo.storage.FileTablespace", + "handler": "org.apache.tajo.storage.s3.S3TableSpace", "default-format": "text" } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml new file mode 100644 index 0000000..a9a541a --- /dev/null +++ b/tajo-storage/tajo-storage-s3/pom.xml @@ -0,0 +1,233 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.12.0-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tajo-storage-s3</artifactId> + <packaging>jar</packaging> + <name>Tajo S3 storage</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/dataset/**</exclude> + <exclude>src/test/resources/queries/**</exclude> + <exclude>src/test/resources/results/**</exclude> + </excludes> + </configuration> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-json</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <!-- Run unit tests in tajo-storage-s3, whereas it is disabled as by default. --> + <id>test-storage-s3</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration combine.self="override"> + <systemProperties> + <tajo.test.enabled>TRUE</tajo.test.enabled> + </systemProperties> + <argLine>-Xms128m -Xmx1024m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </reporting> +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java new file mode 100644 index 0000000..7ac5425 --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -0,0 +1,1164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import net.minidev.json.JSONObject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.Bytes; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; +import java.text.NumberFormat; +import java.util.*; + +public class S3TableSpace extends Tablespace { + + public static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + private final Log LOG = LogFactory.getLog(S3TableSpace.class); + + static final String OUTPUT_FILE_PREFIX="part-"; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(6); + return fmt; + } + }; + + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + return fmt; + } + }; + + private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true, false); + private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true); + + protected FileSystem fs; + protected Path spacePath; + protected Path stagingRootPath; + protected boolean blocksMetadataEnabled; + private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); + + public S3TableSpace(String spaceName, URI uri, JSONObject config) { + super(spaceName, uri, config); + } + + @Override + protected void storageInit() throws IOException { + this.spacePath = new Path(uri); + this.fs = spacePath.getFileSystem(conf); + this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR))); + this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString()); + + this.blocksMetadataEnabled = + conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + + if (!this.blocksMetadataEnabled) { + LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); + } + } + + @Override + public long getTableVolume(URI uri) throws UnsupportedException { + Path path = new Path(uri); + ContentSummary summary; + try { + summary = fs.getContentSummary(path); + } catch (IOException e) { + throw new TajoInternalError(e); + } + return summary.getLength(); + } + + @Override + public URI getRootUri() { + return fs.getUri(); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) + throws IOException { + FileStatus status = fs.getFileStatus(path); + return getFileScanner(meta, schema, path, status); + } + + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) + throws IOException { + Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + return getScanner(meta, schema, fragment, null); + } + + public FileSystem getFileSystem() { + return this.fs; + } + + public void delete(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + fs.delete(tablePath, true); + } + + public boolean exists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + return fileSystem.exists(path); + } + + @Override + public URI getTableUri(String databaseName, String tableName) { + return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); + } + + @VisibleForTesting + public Appender getAppender(TableMeta meta, Schema schema, Path filePath) + throws IOException { + return getAppender(null, null, meta, schema, filePath); + } + + public FileFragment[] split(String tableName) throws IOException { + Path tablePath = new Path(spacePath, tableName); + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, long fragmentSize) throws IOException { + Path tablePath = new Path(spacePath, tableName); + return split(tableName, tablePath, fragmentSize); + } + + public FileFragment[] split(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); + } + + public FileFragment[] split(String tableName, Path tablePath) throws IOException { + return split(tableName, tablePath, fs.getDefaultBlockSize()); + } + + private FileFragment[] split(String tableName, Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, + Path tablePath, long size) + throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + + long defaultBlockSize = size; + List<FileFragment> listTablets = new ArrayList<>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus file : fileLists) { + long remainFileSize = file.getLen(); + long start = 0; + if (remainFileSize > defaultBlockSize) { + while (remainFileSize > defaultBlockSize) { + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); + listTablets.add(tablet); + start += defaultBlockSize; + remainFileSize -= defaultBlockSize; + } + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); + } else { + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + public long calculateSize(Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(conf); + long totalSize = 0; + + if (fs.exists(tablePath)) { + totalSize = fs.getContentSummary(tablePath).getLength(); + } + + return totalSize; + } + + ///////////////////////////////////////////////////////////////////////////// + // FileInputFormat Area + ///////////////////////////////////////////////////////////////////////////// + public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) { + if (taskAttemptId == null) { + // For testcase + return workDir; + } + // The final result of a task will be written in a file named part-ss-nnnnnnn, + // where ss is the stage id associated with this task, and nnnnnn is the task id. + Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, + OUTPUT_FILE_PREFIX + + OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + + OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" + + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); + LOG.info("Output File Path: " + outFilePath); + + return outFilePath; + } + + /** + * Proxy PathFilter that accepts a path only if all filters given in the + * constructor do. Used by the listPaths() to apply the built-in + * hiddenFileFilter together with a user provided one (if any). + */ + private static class MultiPathFilter implements PathFilter { + private List<PathFilter> filters; + + public MultiPathFilter(List<PathFilter> filters) { + this.filters = filters; + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } + + /** + * List input directories. + * Subclasses may override to, e.g., select only files matching a regular + * expression. + * + * @return array of FileStatus objects + * @throws IOException if zero items. + */ + protected List<FileStatus> listStatus(Path... dirs) throws IOException { + List<FileStatus> result = new ArrayList<>(); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + + List<IOException> errors = new ArrayList<>(); + + // creates a MultiPathFilter with the hiddenFileFilter and the + // user provided one (if any). + List<PathFilter> filters = new ArrayList<>(); + filters.add(hiddenFileFilter); + + PathFilter inputFilter = new MultiPathFilter(filters); + + for (int i = 0; i < dirs.length; ++i) { + Path p = dirs[i]; + + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + LOG.warn("Input path does not exist: " + p); + } else if (matches.length == 0) { + LOG.warn("Input Pattern " + p + " matches 0 files"); + } else { + for (FileStatus globStat : matches) { + if (globStat.isDirectory()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { + result.add(stat); + } + } else { + result.add(globStat); + } + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result; + } + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + * <p/> + * <code>FileInputFormat</code> implementations can override this and return + * <code>false</code> to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param path the file name to check + * @param status get the file length + * @return is this file isSplittable? + */ + protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { + Scanner scanner = getFileScanner(meta, schema, path, status); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + private static final double SPLIT_SLOP = 1.1; // 10% slop + + protected int getBlockIndex(BlockLocation[] blkLocations, + long offset) { + for (int i = 0; i < blkLocations.length; i++) { + // is the offset inside this block? + if ((blkLocations[i].getOffset() <= offset) && + (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1; + throw new IllegalArgumentException("Offset " + offset + + " is outside of file (0.." + + fileLength + ")"); + } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { + return new FileFragment(fragmentId, file, start, length); + } + + protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, + String[] hosts) { + return new FileFragment(fragmentId, file, start, length, hosts); + } + + protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) + throws IOException { + return new FileFragment(fragmentId, file, blockLocation); + } + + // for Non Splittable. eg, compressed gzip TextFile + protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations) throws IOException { + + Map<String, Integer> hostsBlockMap = new HashMap<>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List<Map.Entry<String, Integer>> entries = new ArrayList<>(hostsBlockMap.entrySet()); + Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { + + @Override + public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { + return v1.getValue().compareTo(v2.getValue()); + } + }); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + return new FileFragment(fragmentId, file, start, length, hosts); + } + + /** + * Get the minimum split size + * + * @return the minimum number of bytes that can be in a split + */ + public long getMinSplitSize() { + return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); + } + + /** + * Get Disk Ids by Volume Bytes + */ + private int[] getDiskIds(VolumeId[] volumeIds) { + int[] diskIds = new int[volumeIds.length]; + for (int i = 0; i < volumeIds.length; i++) { + int diskId = -1; + if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { + diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); + } + diskIds[i] = diskId; + } + return diskIds; + } + + /** + * Generate the list of files and make them into FileSplits. + * + * @throws IOException + */ + public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) + throws IOException { + // generate splits' + + List<Fragment> splits = Lists.newArrayList(); + List<Fragment> volumeSplits = Lists.newArrayList(); + List<BlockLocation> blockLocations = Lists.newArrayList(); + + for (Path p : inputs) { + ArrayList<FileStatus> files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + int previousSplitSize = splits.size(); + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittable(meta, schema, path, file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + + } else { + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts())); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts())); + } + } else { // Non splittable + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } + } + } + } + if(LOG.isDebugEnabled()){ + LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); + } + } + + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); + LOG.info("Total # of splits: " + splits.size()); + return splits; + } + + private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) + throws IOException { + + int locationSize = blockLocations.size(); + int splitSize = splits.size(); + if (locationSize == 0 || splitSize == 0) return; + + if (locationSize != splitSize) { + // splits and locations don't match up + LOG.warn("Number of block locations not equal to number of splits: " + + "#locations=" + locationSize + + " #splits=" + splitSize); + return; + } + + DistributedFileSystem fs = (DistributedFileSystem) this.fs; + int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); + int blockLocationIdx = 0; + + Iterator<Fragment> iter = splits.iterator(); + while (locationSize > blockLocationIdx) { + + int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); + List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); + //BlockStorageLocation containing additional volume location information for each replica of each block. + BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); + + for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { + ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); + blockLocationIdx++; + } + } + LOG.info("# of splits with volumeId " + splitSize); + } + + private static class InvalidInputException extends IOException { + List<IOException> errors; + public InvalidInputException(List<IOException> errors) { + this.errors = errors; + } + + @Override + public String getMessage(){ + StringBuffer sb = new StringBuffer(); + int messageLimit = Math.min(errors.size(), 10); + for (int i = 0; i < messageLimit ; i ++) { + sb.append(errors.get(i).getMessage()).append("\n"); + } + + if(messageLimit < errors.size()) + sb.append("skipped .....").append("\n"); + + return sb.toString(); + } + } + + @Override + public List<Fragment> getSplits(String inputSourceId, + TableDesc table, + @Nullable EvalNode filterCondition) throws IOException { + return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri())); + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + if (!tableDesc.isExternal()) { + String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String simpleTableName = splitted[1]; + + // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) + Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName); + tableDesc.setUri(tablePath.toUri()); + } else { + Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given."); + } + + Path path = new Path(tableDesc.getUri()); + + FileSystem fs = path.getFileSystem(conf); + TableStats stats = new TableStats(); + if (tableDesc.isExternal()) { + if (!fs.exists(path)) { + LOG.error(path.toUri() + " does not exist"); + throw new IOException("ERROR: " + path.toUri() + " does not exist"); + } + } else { + fs.mkdirs(path); + } + + long totalSize = 0; + + try { + totalSize = calculateSize(path); + } catch (IOException e) { + LOG.warn("Cannot calculate the size of the relation", e); + } + + stats.setNumBytes(totalSize); + + if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + } + + tableDesc.setStats(stats); + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + try { + Path path = new Path(tableDesc.getUri()); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Delete table data dir: " + path); + fs.delete(path, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + } + + @Override + public StorageProperty getProperty() { + return FileStorageProperties; + } + + @Override + public FormatProperty getFormatProperty(TableMeta meta) { + return GeneralFileProperties; + } + + @Override + public void close() { + } + + @Override + public void prepareTable(LogicalNode node) throws IOException { + } + + @Override + public void rollbackTable(LogicalNode node) throws IOException { + } + + @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, ""); + + Path stagingDir; + // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO + // So, this query results won't be materialized as a part of a table. + // The result will be temporarily written in the staging directory. + if (outputPath.isEmpty()) { + // for temporarily written in the storage directory + stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); + } else { + Tablespace space = TablespaceManager.get(outputPath); + if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation + // If this space allows move operation, the staging directory will be underneath the final output table uri. + stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId)); + } else { + stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); + } + } + + return stagingDir.toUri(); + } + + // query submission directory is private! + final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx-------- + public static final String TMP_STAGING_DIR_PREFIX = ".staging"; + + public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta) + throws IOException { + + String realUser; + String currentUser; + UserGroupInformation ugi; + ugi = UserGroupInformation.getLoginUser(); + realUser = ugi.getShortUserName(); + currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + + + Path stagingDir = new Path(getStagingUri(context, queryId, meta)); + + //////////////////////////////////////////// + // Create Output Directory + //////////////////////////////////////////// + + if (fs.exists(stagingDir)) { + throw new IOException("The staging directory '" + stagingDir + "' already exists"); + } + fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + FileStatus fsStatus = fs.getFileStatus(stagingDir); + String owner = fsStatus.getOwner(); + + if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { + throw new IOException("The ownership on the user's query " + + "directory " + stagingDir + " is not as expected. " + + "It is owned by " + owner + ". The directory must " + + "be owned by the submitter " + currentUser + " or " + + "by " + realUser); + } + + if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { + LOG.info("Permissions on staging directory " + stagingDir + " are " + + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + + "to correct value " + STAGING_DIR_PERMISSION); + fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + } + + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.mkdirs(stagingResultDir); + + return stagingDir.toUri(); + } + + @Override + public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) { + } + + @Override + public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, + Schema schema, TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, true); + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + return null; + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws java.io.IOException + */ + protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); + try { + FileSystem fs = stagingResultDir.getFileSystem(conf); + + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + ContentSummary summary = fs.getContentSummary(stagingResultDir); + + // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not. + boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); + + // If existing data doesn't need to keep, check if there are some files. + if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) + && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = new HashMap<>(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = new HashMap<>(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + + throw new IOException(ioe.getMessage()); + } + } else { // no partition + try { + + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); + + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } + + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } + + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + + // Check the final output dir + committed = fs.exists(finalOutputDir); + + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } + + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } + + throw new IOException(ioe.getMessage()); + } + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + + if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } else { + fs.rename(stagingResultDir, finalOutputDir); + } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } + } + + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + + return finalOutputDir; + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws java.io.IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws java.io.IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws java.io.IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws java.io.IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java new file mode 100644 index 0000000..15c6a33 --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.s3; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.net.URI; + +public class MockS3FileSystem extends FileSystem { + private URI uri; + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + } + + + /** + * Return the protocol scheme for the FileSystem. + * <p/> + * + * @return <code>s3</code> + */ + @Override + public String getScheme() { + return "s3"; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path makeQualified(Path path) { + return path; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return false; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return new FileStatus[0]; + } + + @Override + public void setWorkingDirectory(Path new_dir) { + } + + @Override + public Path getWorkingDirectory() { + return null; + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java new file mode 100644 index 0000000..2d06778 --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import net.minidev.json.JSONObject; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.TablespaceManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestS3TableSpace { + public static final String SPACENAME = "s3_cluster"; + public static final String S3_URI = "s3://tajo-test/"; + + @BeforeClass + public static void setUp() throws Exception { + S3TableSpace tablespace = new S3TableSpace(SPACENAME, URI.create(S3_URI), new JSONObject()); + + TajoConf tajoConf = new TajoConf(); + tajoConf.set("fs.s3.impl", MockS3FileSystem.class.getName()); + tablespace.init(tajoConf); + + TablespaceManager.addTableSpaceForTest(tablespace); + } + + @AfterClass + public static void tearDown() throws IOException { + TablespaceManager.removeTablespaceForTest(SPACENAME); + } + + @Test + public void testTablespaceHandler() throws Exception { + assertTrue((TablespaceManager.getByName(SPACENAME)) instanceof S3TableSpace); + assertEquals(SPACENAME, (TablespaceManager.getByName(SPACENAME).getName())); + + assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace); + assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString()); + } +}
