This is an automated email from the ASF dual-hosted git repository. gparai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 7108f162cd4f18121aa9a8ace76326bd5fbf8264 Author: Arina Ielchiieva <[email protected]> AuthorDate: Fri Dec 28 19:45:38 2018 +0200 DRILL-6931: File listing: fix issue for S3 directory objects and improve performance for recursive listing closes #1590 --- .../planner/sql/handlers/ShowFilesHandler.java | 8 +- .../store/ischema/InfoSchemaRecordGenerator.java | 3 +- .../org/apache/drill/exec/util/FileSystemUtil.java | 230 ++++++++++++--------- 3 files changed, 144 insertions(+), 97 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java index 9782bbf..3398340 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java @@ -50,7 +50,7 @@ public class ShowFilesHandler extends DefaultSqlHandler { SchemaPlus drillSchema = defaultSchema; SqlShowFiles showFiles = unwrap(sqlNode, SqlShowFiles.class); SqlIdentifier from = showFiles.getDb(); - String fromDir = "./"; + String fromDir = null; // Show files can be used without from clause, in which case we display the files in the default schema if (from != null) { @@ -61,7 +61,7 @@ public class ShowFilesHandler extends DefaultSqlHandler { // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause. drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1)); // Listing for specific directory: show files in dfs.tmp.specific_directory - fromDir = fromDir + from.names.get((from.names.size() - 1)); + fromDir = from.names.get((from.names.size() - 1)); } if (drillSchema == null) { @@ -81,7 +81,9 @@ public class ShowFilesHandler extends DefaultSqlHandler { .build(logger); } - Path path = new Path(wsSchema.getDefaultLocation(), fromDir); + Path endPath = fromDir == null ? new Path(wsSchema.getDefaultLocation()) : new Path(wsSchema.getDefaultLocation(), fromDir); + // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method) + Path path = new Path(wsSchema.getFS().getUri().toString(), endPath); List<ShowFilesCommandResult> records = FileSystemUtil.listAllSafe(wsSchema.getFS(), path, false).stream() // use ShowFilesCommandResult for backward compatibility .map(fileStatus -> new ShowFilesCommandResult(new Records.File(wsSchema.getFullSchemaName(), wsSchema, fileStatus))) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java index 1e72840..bb49e17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java @@ -435,7 +435,8 @@ public abstract class InfoSchemaRecordGenerator<S> { String defaultLocation = wsSchema.getDefaultLocation(); FileSystem fs = wsSchema.getFS(); boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY); - FileSystemUtil.listAllSafe(fs, new Path(defaultLocation), recursive).forEach( + // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method) + FileSystemUtil.listAllSafe(fs, new Path(fs.getUri().toString(), defaultLocation), recursive).forEach( fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus)) ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java index 47ac44c..82500da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.util; +import org.apache.drill.common.exceptions.ErrorHelper; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -25,7 +26,12 @@ import org.apache.hadoop.fs.PathFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveTask; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -42,6 +48,15 @@ public class FileSystemUtil { public static final PathFilter DUMMY_FILTER = path -> true; /** + * Indicates which file system objects should be returned during listing. + */ + private enum Scope { + DIRECTORIES, + FILES, + ALL + } + + /** * Returns statuses of all directories present in given path applying custom filters if present. * Will also include nested directories if recursive flag is set to true. * @@ -51,10 +66,8 @@ public class FileSystemUtil { * @param filters list of custom filters (optional) * @return list of matching directory statuses */ - public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException { - List<FileStatus> statuses = new ArrayList<>(); - listDirectories(fs, path, recursive, false, statuses, mergeFilters(filters)); - return statuses; + public static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException { + return list(fs, path, Scope.DIRECTORIES, recursive, false, filters); } /** @@ -68,14 +81,13 @@ public class FileSystemUtil { * @param filters list of custom filters (optional) * @return list of matching directory statuses */ - public static List<FileStatus> listDirectoriesSafe(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) { - List<FileStatus> statuses = new ArrayList<>(); + public static List<FileStatus> listDirectoriesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) { try { - listDirectories(fs, path, recursive, true, statuses, mergeFilters(filters)); + return list(fs, path, Scope.DIRECTORIES, recursive, true, filters); } catch (Exception e) { // all exceptions are ignored + return Collections.emptyList(); } - return statuses; } /** @@ -89,9 +101,7 @@ public class FileSystemUtil { * @return list of matching file statuses */ public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException { - List<FileStatus> statuses = new ArrayList<>(); - listFiles(fs, path, recursive, false, statuses, mergeFilters(filters)); - return statuses; + return list(fs, path, Scope.FILES, recursive, false, filters); } /** @@ -105,13 +115,12 @@ public class FileSystemUtil { * @return list of matching file statuses */ public static List<FileStatus> listFilesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) { - List<FileStatus> statuses = new ArrayList<>(); try { - listFiles(fs, path, recursive, true, statuses, mergeFilters(filters)); + return list(fs, path, Scope.FILES, recursive, true, filters); } catch (Exception e) { // all exceptions are ignored + return Collections.emptyList(); } - return statuses; } /** @@ -125,9 +134,7 @@ public class FileSystemUtil { * @return list of matching directory and file statuses */ public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException { - List<FileStatus> statuses = new ArrayList<>(); - listAll(fs, path, recursive, false, statuses, mergeFilters(filters)); - return statuses; + return list(fs, path, Scope.ALL, recursive, false, filters); } /** @@ -142,13 +149,12 @@ public class FileSystemUtil { * @return list of matching directory and file statuses */ public static List<FileStatus> listAllSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) { - List<FileStatus> statuses = new ArrayList<>(); try { - listAll(fs, path, recursive, true, statuses, mergeFilters(filters)); + return list(fs, path, Scope.ALL, recursive, true, filters); } catch (Exception e) { // all exceptions are ignored + return Collections.emptyList(); } - return statuses; } /** @@ -177,7 +183,7 @@ public class FileSystemUtil { * @param filters array of filters * @return one filter that combines all given filters */ - public static PathFilter mergeFilters(final PathFilter... filters) { + public static PathFilter mergeFilters(PathFilter... filters) { if (filters.length == 0) { return DUMMY_FILTER; } @@ -186,103 +192,141 @@ public class FileSystemUtil { } /** - * Helper method that will store in given holder statuses of all directories present in given path applying custom filter. - * If recursive flag is set to true, will call itself recursively to add statuses of nested directories. - * If suppress exceptions flag is set to true, will ignore all exceptions during listing. + * Helper method that merges given filters into one and + * determines which listing method should be called based on recursive flag value. * - * @param fs current file system - * @param path path to directory - * @param recursive true if nested directories should be included - * @param suppressExceptions indicates if exceptions should be ignored during listing - * @param statuses holder for directory statuses - * @param filter custom filter - * @return holder with all matching directory statuses + * @param fs file system + * @param path path to file or directory + * @param scope file system objects scope + * @param recursive indicates if listing should be done recursively + * @param suppressExceptions indicates if exceptions should be ignored + * @param filters filters to be applied + * @return list of file statuses */ - private static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions, - List<FileStatus> statuses, PathFilter filter) throws IOException { - try { - for (FileStatus status : fs.listStatus(path, filter)) { - if (status.isDirectory()) { - statuses.add(status); - if (recursive) { - listDirectories(fs, status.getPath(), true, suppressExceptions, statuses, filter); - } - } - } - } catch (Exception e) { - if (suppressExceptions) { - logger.debug("Exception during listing file statuses", e); - } else { - throw e; - } - } - return statuses; + private static List<FileStatus> list(FileSystem fs, Path path, Scope scope, boolean recursive, boolean suppressExceptions, PathFilter... filters) throws IOException { + PathFilter filter = mergeFilters(filters); + return recursive ? listRecursive(fs, path, scope, suppressExceptions, filter) + : listNonRecursive(fs, path, scope, suppressExceptions, filter); } /** - * Helper method that will store in given holder statuses of all files present in given path applying custom filter. - * If recursive flag is set to true, will call itself recursively to add file statuses from nested directories. - * If suppress exceptions flag is set to true, will ignore all exceptions during listing. + * Lists file statuses non-recursively based on given file system objects {@link Scope}. * - * @param fs current file system + * @param fs file system * @param path path to file or directory - * @param recursive true if files in nested directories should be included - * @param suppressExceptions indicates if exceptions should be ignored during listing - * @param statuses holder for file statuses - * @param filter custom filter - * @return holder with all matching file statuses + * @param scope file system objects scope + * @param suppressExceptions indicates if exceptions should be ignored + * @param filter filter to be applied + * @return list of file statuses */ - private static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions, - List<FileStatus> statuses, PathFilter filter) throws IOException { + private static List<FileStatus> listNonRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) throws IOException { try { - for (FileStatus status : fs.listStatus(path, filter)) { - if (status.isDirectory()) { - if (recursive) { - listFiles(fs, status.getPath(), true, suppressExceptions, statuses, filter); - } - } else { - statuses.add(status); - } - } + return Stream.of(fs.listStatus(path, filter)) + .filter(status -> isStatusApplicable(status, scope)) + .collect(Collectors.toList()); } catch (Exception e) { if (suppressExceptions) { logger.debug("Exception during listing file statuses", e); + return Collections.emptyList(); } else { throw e; } } - return statuses; } /** - * Helper method that will store in given holder statuses of all directories and files present in given path applying custom filter. - * If recursive flag is set to true, will call itself recursively to add nested directories and their file statuses. - * If suppress exceptions flag is set to true, will ignore all exceptions during listing. + * Lists file statuses recursively based on given file system objects {@link Scope}. + * Uses {@link ForkJoinPool} executor service and {@link RecursiveListing} task + * to parallel and speed up listing. * - * @param fs current file system + * @param fs file system * @param path path to file or directory - * @param recursive true if nested directories and their files should be included - * @param suppressExceptions indicates if exceptions should be ignored during listing - * @param statuses holder for directory and file statuses - * @param filter custom filter - * @return holder with all matching directory and file statuses + * @param scope file system objects scope + * @param suppressExceptions indicates if exceptions should be ignored + * @param filter filter to be applied + * @return list of file statuses */ - private static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions, - List<FileStatus> statuses, PathFilter filter) throws IOException { + private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) { + ForkJoinPool pool = new ForkJoinPool(); try { - for (FileStatus status : fs.listStatus(path, filter)) { - statuses.add(status); - if (status.isDirectory() && recursive) { - listAll(fs, status.getPath(), true, suppressExceptions, statuses, filter); + RecursiveListing task = new RecursiveListing(fs, path, scope, suppressExceptions, filter); + return pool.invoke(task); + } finally { + pool.shutdown(); + } + } + + /** + * Checks if file status is applicable based on file system object {@link Scope}. + * + * @param status file status + * @param scope file system objects scope + * @return true if status is applicable, false otherwise + */ + private static boolean isStatusApplicable(FileStatus status, Scope scope) { + switch (scope) { + case DIRECTORIES: + return status.isDirectory(); + case FILES: + return status.isFile(); + case ALL: + return true; + default: + return false; + } + } + + /** + * Task that parallels file status listing for each nested directory, + * gathers and returns common list of file statuses. + */ + private static class RecursiveListing extends RecursiveTask<List<FileStatus>> { + + private final FileSystem fs; + private final Path path; + private final Scope scope; + private final boolean suppressExceptions; + private final PathFilter filter; + + RecursiveListing(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) { + this.fs = fs; + this.path = path; + this.scope = scope; + this.suppressExceptions = suppressExceptions; + this.filter = filter; + } + + @Override + protected List<FileStatus> compute() { + List<FileStatus> statuses = new ArrayList<>(); + List<RecursiveListing> tasks = new ArrayList<>(); + + try { + for (FileStatus status : fs.listStatus(path, filter)) { + if (isStatusApplicable(status, scope)) { + statuses.add(status); + } + if (status.isDirectory()) { + RecursiveListing task = new RecursiveListing(fs, status.getPath(), scope, suppressExceptions, filter); + task.fork(); + tasks.add(task); + } + } + } catch (Exception e) { + if (suppressExceptions) { + logger.debug("Exception during listing file statuses", e); + } else { + // is used to re-throw checked exception + ErrorHelper.sneakyThrow(e); } } - } catch (Exception e) { - if (suppressExceptions) { - logger.debug("Exception during listing file statuses", e); - } else { - throw e; - } + + tasks.stream() + .map(ForkJoinTask::join) + .forEach(statuses::addAll); + + return statuses; } - return statuses; } + }
