This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new bd3ee4c DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0 bd3ee4c is described below commit bd3ee4cde5ced05362b538cd7aacbdab73ccef88 Author: Sorabh Hamirwasia <sor...@apache.org> AuthorDate: Wed Mar 20 13:29:40 2019 -0700 DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0 --- .../sql/handlers/RefreshMetadataHandler.java | 33 ++++---- .../exec/store/parquet/metadata/Metadata.java | 88 +++++++++++++--------- 2 files changed, 72 insertions(+), 49 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java index 0a02f03..d7bb036 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java @@ -17,18 +17,14 @@ */ package org.apache.drill.exec.planner.sql.handlers; -import java.util.HashSet; -import java.util.Set; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.drill.common.expression.SchemaPath; -import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema; - -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Table; -import org.apache.calcite.sql.SqlNode; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.logical.DrillTable; @@ -37,15 +33,20 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites; import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig; +import org.apache.drill.exec.store.parquet.ParquetFormatConfig; import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.exec.store.parquet.metadata.Metadata; -import org.apache.drill.exec.store.parquet.ParquetFormatConfig; +import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.hadoop.fs.Path; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema; + public class RefreshMetadataHandler extends DefaultSqlHandler { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class); @@ -107,18 +108,20 @@ public class RefreshMetadataHandler extends DefaultSqlHandler { return notSupported(tableName); } - FormatSelection formatSelection = (FormatSelection) selection; + final FormatSelection formatSelection = (FormatSelection) selection; FormatPluginConfig formatConfig = formatSelection.getFormat(); if (!((formatConfig instanceof ParquetFormatConfig) || - ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) { + ((formatConfig instanceof NamedFormatPluginConfig) && + ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) { return notSupported(tableName); } - FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin(); - DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf()); + // Always create filesystem object using process user, since it owns the metadata file + final DrillFileSystem fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), + drillTable.getPlugin().getFormatPlugin(formatConfig).getFsConf()); - Path selectionRoot = formatSelection.getSelection().getSelectionRoot(); + final Path selectionRoot = formatSelection.getSelection().getSelectionRoot(); if (!fs.getFileStatus(selectionRoot).isDirectory()) { return notSupported(tableName); } @@ -127,7 +130,7 @@ public class RefreshMetadataHandler extends DefaultSqlHandler { formatConfig = new ParquetFormatConfig(); } - ParquetReaderConfig readerConfig = ParquetReaderConfig.builder() + final ParquetReaderConfig readerConfig = ParquetReaderConfig.builder() .withFormatConfig((ParquetFormatConfig) formatConfig) .withOptions(context.getOptions()) .build(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index cd4aec2..c61c458 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -24,23 +24,21 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; -import org.apache.drill.exec.serialization.PathSerDe; -import java.util.Set; -import org.apache.drill.exec.store.parquet.ParquetReaderConfig; -import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; - import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.collections.Collectors; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.DrillVersionInfo; +import org.apache.drill.exec.serialization.PathSerDe; import org.apache.drill.exec.store.TimedCallable; import org.apache.drill.exec.store.dfs.MetadataContext; +import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -69,6 +67,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -85,7 +84,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3; /** - * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig} + * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}. All the creation of + * parquet metadata cache using create api's are forced to happen using the process user since only that user will have + * write permission for the cache file */ public class Metadata { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); @@ -114,7 +115,7 @@ public class Metadata { */ public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig, boolean allColumns, Set<String> columnSet) throws IOException { Metadata metadata = new Metadata(readerConfig); - metadata.createMetaFilesRecursively(path, fs, allColumns, columnSet); + metadata.createMetaFilesRecursivelyAsProcessUser(path, fs, allColumns, columnSet); } /** @@ -204,6 +205,26 @@ public class Metadata { } /** + * Wrapper which makes sure that in all cases metadata file is created as a process user no matter what the caller + * is passing. + * @param path to the directory of the parquet table + * @param fs file system + * @param allColumns if set, store column metadata for all the columns + * @param columnSet Set of columns for which column metadata has to be stored + * @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is + * a metadata for all subdirectories (if they are present and there are no any parquet files in the + * {@code path} directory). + * @throws IOException if parquet metadata can't be serialized and written to the json file + */ + private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> + createMetaFilesRecursivelyAsProcessUser(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) + throws IOException { + final FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), + fs.getConf()); + return createMetaFilesRecursively(path, processUserFileSystem, allColumns, columnSet); + } + + /** * Create the parquet metadata files for the directory at the given path and for any subdirectories. * Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths. * @@ -216,21 +237,23 @@ public class Metadata { * {@code path} directory). * @throws IOException if parquet metadata can't be serialized and written to the json file */ - private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) throws IOException { + private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> + createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) + throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList(); List<Path> directoryList = Lists.newArrayList(); ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet = new ConcurrentHashMap<>(); - Path p = path; - FileStatus fileStatus = fs.getFileStatus(p); + FileStatus fileStatus = fs.getFileStatus(path); assert fileStatus.isDirectory() : "Expected directory"; final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>(); - for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { + for (final FileStatus file : DrillFileSystemUtil.listAll(fs, path, false)) { if (file.isDirectory()) { - ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns, columnSet)).getLeft(); + ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns, + columnSet)).getLeft(); metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); directoryList.add(file.getPath()); @@ -259,17 +282,17 @@ public class Metadata { parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet); for (String oldName : OLD_METADATA_FILENAMES) { - fs.delete(new Path(p, oldName), false); + fs.delete(new Path(path, oldName), false); } // relative paths in the metadata are only necessary for meta cache files. ParquetTableMetadata_v3 metadataTableWithRelativePaths = MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path); - writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME), fs); + writeFile(metadataTableWithRelativePaths, new Path(path, METADATA_FILENAME), fs); if (directoryList.size() > 0 && childFiles.size() == 0) { ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths = new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories); - writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, METADATA_DIRECTORIES_FILENAME), fs); + writeFile(parquetTableMetadataDirsRelativePaths, new Path(path, METADATA_DIRECTORIES_FILENAME), fs); if (timer != null) { logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); } @@ -610,7 +633,7 @@ public class Metadata { parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadataDirs = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight(); + (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight(); newMetadata = true; } } else { @@ -625,7 +648,7 @@ public class Metadata { if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { // TODO change with current columns in existing metadata (auto refresh feature) parquetTableMetadata = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft(); + (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft(); newMetadata = true; } @@ -664,32 +687,29 @@ public class Metadata { FileStatus directoryStatus = fs.getFileStatus(parentDir); int numDirs = 1; if (directoryStatus.getModificationTime() > metaFileModifyTime) { - if (timer != null) { - logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", - directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs); - timer.stop(); - } - return true; + return logAndStopTimer(true, directoryStatus.getPath().toString(), timer, numDirs); } + boolean isModified = false; for (Path directory : directories) { numDirs++; metaContext.setStatus(directory); directoryStatus = fs.getFileStatus(directory); if (directoryStatus.getModificationTime() > metaFileModifyTime) { - if (timer != null) { - logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", - directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs); - timer.stop(); - } - return true; + isModified = true; + break; } } + return logAndStopTimer(isModified, directoryStatus.getPath().toString(), timer, numDirs); + } + + private boolean logAndStopTimer(boolean isModified, String directoryName, + Stopwatch timer, int numDirectories) { if (timer != null) { - logger.debug("No directories were modified. Took {} ms to check modification time of {} directories", - timer.elapsed(TimeUnit.MILLISECONDS), numDirs); + logger.debug("{} directory was modified. Took {} ms to check modification time of {} directories", + isModified ? directoryName : "No", timer.elapsed(TimeUnit.MILLISECONDS), numDirectories); timer.stop(); } - return false; + return isModified; } }