Repository: drill Updated Branches: refs/heads/master 90f43bff7 -> 9cf6faa7a
DRILL-3867: Metadata Caching : Moving a directory which contains a cache file causes subsequent queries to fail - change absolute to relative path in the parquet metadata cache files; - add converting of the relative paths in the metadata to absolute ones. - test case when table is moved to other place after creating meta cache files. Changes according to the review Minor changes according to the review close apache/drill#824 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/964a9473 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/964a9473 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/964a9473 Branch: refs/heads/master Commit: 964a947315c31571954c1f4f56ac3336ac7bbcda Parents: 90f43bf Author: Vitalii Diravka <vitalii.dira...@gmail.com> Authored: Fri Apr 14 18:57:13 2017 +0000 Committer: Aman Sinha <asi...@maprtech.com> Committed: Sat Jun 24 09:16:01 2017 -0700 ---------------------------------------------------------------------- .../drill/exec/store/parquet/Metadata.java | 159 ++++++++++++++++--- .../exec/store/parquet/ParquetGroupScan.java | 15 +- .../java/org/apache/drill/BaseTestQuery.java | 58 +++++++ .../TestCorruptParquetDateCorrection.java | 82 ++-------- .../store/parquet/TestParquetMetadataCache.java | 53 ++++++- ...ies_with_absolute_paths.requires_replace.txt | 3 + ...ble_with_absolute_paths.requires_replace.txt | 108 +++++++++++++ ..._with_absolute_paths_t1.requires_replace.txt | 76 +++++++++ ..._with_absolute_paths_t2.requires_replace.txt | 76 +++++++++ 9 files changed, 530 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java index d85d6f1..0a4ce60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,12 +67,10 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.KeyDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; @@ -159,10 +157,14 @@ public class Metadata { } /** - * Create the parquet metadata file for the directory at the given path, and for any subdirectories + * Create the parquet metadata files for the directory at the given path and for any subdirectories. + * Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths. * - * @param path - * @throws IOException + * @param path to the directory of the parquet table + * @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is + * a metadata for all subdirectories (if they are present and there are no any parquet files in the + * {@code path} directory). + * @throws IOException if parquet metadata can't be serialized and written to the json file */ private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final String path) throws IOException { @@ -207,16 +209,21 @@ public class Metadata { } parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet); - for (String oldname : OLD_METADATA_FILENAMES) { - fs.delete(new Path(p, oldname), false); + for (String oldName : OLD_METADATA_FILENAMES) { + fs.delete(new Path(p, oldName), false); } - writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME)); + // relative paths in the metadata are only necessary for meta cache files. + ParquetTableMetadata_v3 metadataTableWithRelativePaths = + MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path); + writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME)); if (directoryList.size() > 0 && childFiles.size() == 0) { - ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList); - writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME)); + ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths = + new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories); + writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, METADATA_DIRECTORIES_FILENAME)); logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); + ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList); return Pair.of(parquetTableMetadata, parquetTableMetadataDirs); } List<String> emptyDirList = Lists.newArrayList(); @@ -264,8 +271,10 @@ public class Metadata { /** * Get a list of file metadata for a list of parquet files * - * @param fileStatuses - * @return + * @param parquetTableMetadata_v3 can store column schema info from all the files and row groups + * @param fileStatuses list of the parquet files statuses + * + * @return list of the parquet file metadata with absolute paths * @throws IOException */ private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3( @@ -372,10 +381,6 @@ public class Metadata { /** * Get the metadata for a single file - * - * @param file - * @return - * @throws IOException */ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3 parquetTableMetadata, FileStatus file) throws IOException { @@ -537,7 +542,8 @@ public class Metadata { MetadataContext metaContext) throws IOException { Stopwatch timer = Stopwatch.createStarted(); Path p = new Path(path); - Path parentDir = p.getParent(); // parent directory of the metadata file + Path parentDir = Path.getPathWithoutSchemeAndAuthority(p.getParent()); // parent directory of the metadata file + String parentDirString = parentDir.toUri().toString(); // string representation for parent directory of the metadata file ObjectMapper mapper = new ObjectMapper(); final SimpleModule serialModule = new SimpleModule(); @@ -557,13 +563,14 @@ public class Metadata { boolean newMetadata = false; if (metaContext != null) { - alreadyCheckedModification = metaContext.getStatus(parentDir.toString()); + alreadyCheckedModification = metaContext.getStatus(parentDirString); } if (dirsOnly) { parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class); logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); + parquetTableMetadataDirs.updateRelativePaths(parentDirString); if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) { parquetTableMetadataDirs = (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight(); @@ -573,6 +580,9 @@ public class Metadata { parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class); logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); + if (parquetTableMetadata instanceof ParquetTableMetadata_v3) { + ((ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(parentDirString); + } if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) { parquetTableMetadata = (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft(); @@ -748,6 +758,12 @@ public class Metadata { return directories; } + /** If directories list contains relative paths, update it to absolute ones + * @param baseDir base parent directory + */ + @JsonIgnore public void updateRelativePaths(String baseDir) { + this.directories = MetadataPathUtils.convertToAbsolutePaths(directories, baseDir); + } } @JsonTypeName("v1") @@ -1413,6 +1429,18 @@ public class Metadata { return directories; } + /** + * If directories list and file metadata list contain relative paths, update it to absolute ones + * @param baseDir base parent directory + */ + @JsonIgnore public void updateRelativePaths(String baseDir) { + // update directories paths to absolute ones + this.directories = MetadataPathUtils.convertToAbsolutePaths(directories, baseDir); + + // update files paths to absolute ones + this.files = MetadataPathUtils.convertToFilesWithAbsolutePaths(files, baseDir); + } + @JsonIgnore @Override public List<? extends ParquetFileMetadata> getFiles() { return files; } @@ -1736,5 +1764,98 @@ public class Metadata { } + /** + * Util class that contains helper methods for converting paths in the table and directory metadata structures + */ + private static class MetadataPathUtils { + + /** + * Helper method that converts a list of relative paths to absolute ones + * + * @param paths list of relative paths + * @param baseDir base parent directory + * @return list of absolute paths + */ + private static List<String> convertToAbsolutePaths(List<String> paths, String baseDir) { + if (!paths.isEmpty()) { + List<String> absolutePaths = Lists.newArrayList(); + for (String relativePath : paths) { + String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath + : new Path(baseDir, relativePath).toUri().toString(); + absolutePaths.add(absolutePath); + } + return absolutePaths; + } + return paths; + } + + /** + * Convert a list of files with relative paths to files with absolute ones + * + * @param files list of files with relative paths + * @param baseDir base parent directory + * @return list of files with absolute paths + */ + private static List<ParquetFileMetadata_v3> convertToFilesWithAbsolutePaths( + List<ParquetFileMetadata_v3> files, String baseDir) { + if (!files.isEmpty()) { + List<ParquetFileMetadata_v3> filesWithAbsolutePaths = Lists.newArrayList(); + for (ParquetFileMetadata_v3 file : files) { + Path relativePath = new Path(file.getPath()); + // create a new file if old one contains a relative path, otherwise use an old file + ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file + : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().toString(), file.length, file.rowGroups); + filesWithAbsolutePaths.add(fileWithAbsolutePath); + } + return filesWithAbsolutePaths; + } + return files; + } + + /** + * Creates a new parquet table metadata from the {@code tableMetadataWithAbsolutePaths} parquet table. + * A new parquet table will contain relative paths for the files and directories. + * + * @param tableMetadataWithAbsolutePaths parquet table metadata with absolute paths for the files and directories + * @param baseDir base parent directory + * @return parquet table metadata with relative paths for the files and directories + */ + private static ParquetTableMetadata_v3 createMetadataWithRelativePaths( + ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) { + List<String> directoriesWithRelativePaths = Lists.newArrayList(); + for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) { + directoriesWithRelativePaths.add(relativize(baseDir, directory)) ; + } + List<ParquetFileMetadata_v3> filesWithRelativePaths = Lists.newArrayList(); + for (ParquetFileMetadata_v3 file : tableMetadataWithAbsolutePaths.files) { + filesWithRelativePaths.add(new ParquetFileMetadata_v3( + relativize(baseDir, file.getPath()), file.length, file.rowGroups)); + } + return new ParquetTableMetadata_v3(tableMetadataWithAbsolutePaths, filesWithRelativePaths, + directoriesWithRelativePaths, DrillVersionInfo.getVersion()); + } + + /** + * Constructs relative path from child full path and base path. Or return child path if the last one is already relative + * + * @param childPath full absolute path + * @param baseDir base path (the part of the Path, which should be cut off from child path) + * @return relative path + */ + private static String relativize(String baseDir, String childPath) { + Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(childPath)); + Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(baseDir)); + + // Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path + Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri() + .relativize(fullPathWithoutSchemeAndAuthority.toUri())); + if (relativeFilePath.isAbsolute()) { + throw new IllegalStateException(String.format("Path %s is not a subpath of %s.", + basePathWithoutSchemeAndAuthority.toUri().toString(), fullPathWithoutSchemeAndAuthority.toUri().toString())); + } + return relativeFilePath.toUri().toString(); + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 71e681b..3d9cfb3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.generic.GenericData; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -44,12 +43,10 @@ import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.expr.stat.ParquetFilterPredicate; -import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractFileGroupScan; -import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; @@ -58,7 +55,6 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.ImplicitColumnExplorer; -import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillPathFilter; @@ -677,15 +673,16 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } else { // we need to expand the files from fileStatuses for (FileStatus status : fileStatuses) { + Path cacheFileRoot = status.getPath(); if (status.isDirectory()) { //TODO [DRILL-4496] read the metadata cache files in parallel - final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME); + final Path metaPath = new Path(cacheFileRoot, Metadata.METADATA_FILENAME); final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext(), formatConfig); for (Metadata.ParquetFileMetadata file : metadata.getFiles()) { fileSet.add(file.getPath()); } } else { - final Path path = Path.getPathWithoutSchemeAndAuthority(status.getPath()); + final Path path = Path.getPathWithoutSchemeAndAuthority(cacheFileRoot); fileSet.add(path.toString()); } } @@ -718,9 +715,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } private void init(MetadataContext metaContext) throws IOException { + Path metaPath = null; if (entries.size() == 1 && parquetTableMetadata == null) { Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); - Path metaPath = null; if (fs.isDirectory(p)) { // Using the metadata file makes sense when querying a directory; otherwise // if querying a single file we can look up the metadata directly from the file @@ -734,7 +731,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } else { Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); - Path metaPath = new Path(p, Metadata.METADATA_FILENAME); + metaPath = new Path(p, Metadata.METADATA_FILENAME); if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) { usedMetadataCache = true; if (parquetTableMetadata == null) { http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 4401b9f..e3ce9fc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -36,6 +36,7 @@ import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.scanner.persistence.ScanResult; +import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.client.DrillClient; @@ -58,6 +59,10 @@ import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.util.TestUtilities; import org.apache.drill.exec.util.VectorUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.rules.TestRule; @@ -69,6 +74,9 @@ import com.google.common.base.Preconditions; import com.google.common.io.Resources; import java.util.ArrayList; import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; @@ -120,6 +128,8 @@ public class BaseTestQuery extends ExecTest { private static ScanResult classpathScan; + private static FileSystem fs; + @BeforeClass public static void setupDefaultTestCluster() throws Exception { config = DrillConfig.create(TEST_CONFIGURATIONS); @@ -128,6 +138,9 @@ public class BaseTestQuery extends ExecTest { // turns on the verbose errors in tests // sever side stacktraces are added to the message before sending back to the client test("ALTER SESSION SET `exec.errors.verbose` = true"); + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); + fs = FileSystem.get(conf); } protected static void updateTestCluster(int newDrillbitCount, DrillConfig newConfig) { @@ -593,4 +606,49 @@ public class BaseTestQuery extends ExecTest { } } } + + private static String replaceWorkingPathInString(String orig) { + return orig.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath())); + } + + protected static void copyDirectoryIntoTempSpace(String resourcesDir) throws IOException { + copyDirectoryIntoTempSpace(resourcesDir, null); + } + + protected static void copyDirectoryIntoTempSpace(String resourcesDir, String destinationSubDir) throws IOException { + Path destination = destinationSubDir != null ? new Path(getDfsTestTmpSchemaLocation(), destinationSubDir) + : new Path(getDfsTestTmpSchemaLocation()); + fs.copyFromLocalFile( + new Path(replaceWorkingPathInString(resourcesDir)), + destination); + } + + /** + * Old metadata cache files include full paths to the files that have been scanned. + * <p> + * There is no way to generate a metadata cache file with absolute paths that + * will be guaranteed to be available on an arbitrary test machine. + * <p> + * To enable testing older metadata cache files, they were generated manually + * using older drill versions, and the absolute path up to the folder where + * the metadata cache file appeared was manually replaced with the string + * REPLACED_IN_TEST. Here the file is re-written into the given temporary + * location after the REPLACED_IN_TEST string has been replaced by the actual + * location generated during this run of the tests. + * + * @param srcFileOnClassPath the source path of metadata cache file, which should be replaced + * @param destFolderInTmp the parent folder name of the metadata cache file + * @param metaFileName the name of metadata cache file depending on the type of the metadata + * @throws IOException if a create or write errors occur + */ + protected static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp, + String metaFileName) throws IOException { + String metadataFileContents = getFile(srcFileOnClassPath); + Path rootMeta = new Path(dfsTestTmpSchemaLocation, destFolderInTmp); + Path newMetaCache = new Path(rootMeta, metaFileName); + FSDataOutputStream outSteam = fs.create(newMetaCache); + outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", dfsTestTmpSchemaLocation)); + outSteam.close(); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java index 0c98eee..df13799 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,20 +21,13 @@ import static java.lang.String.format; import org.apache.drill.PlanTestBase; import org.apache.drill.TestBuilder; -import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; +import org.apache.drill.exec.store.parquet.Metadata; import org.apache.hadoop.fs.Path; import org.joda.time.DateTime; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - /** * Tests for compatibility reading old parquet files after date corruption * issue was fixed in DRILL-4203. @@ -106,21 +99,13 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { private static final String PARTITIONED_1_2_FOLDER = "partitioned_with_corruption_4203_1_2"; private static final String MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER = "mixed_partitioned"; - private static FileSystem fs; - private static Path path; @BeforeClass public static void initFs() throws Exception { - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); - fs = FileSystem.get(conf); - path = new Path(getDfsTestTmpSchemaLocation()); - - // Move files into temp directory, rewrite the metadata cache file to contain the appropriate absolute - // path + // Move files into temp directory, rewrite the metadata cache file to contain the appropriate absolute path copyDirectoryIntoTempSpace(CORRUPTED_PARTITIONED_DATES_1_2_PATH); copyMetaDataCacheToTempReplacingInternalPaths("parquet/4203_corrupt_dates/drill.parquet.metadata_1_2.requires_replace.txt", - PARTITIONED_1_2_FOLDER); + PARTITIONED_1_2_FOLDER, Metadata.METADATA_FILENAME); copyDirectoryIntoTempSpace(CORRUPTED_PARTITIONED_DATES_1_2_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER); copyDirectoryIntoTempSpace(CORRECT_PARTITIONED_DATES_1_9_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER); copyDirectoryIntoTempSpace(CORRUPTED_PARTITIONED_DATES_1_4_0_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER); @@ -334,7 +319,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { @Test public void testReadOldMetadataCacheFile() throws Exception { // for sanity, try reading all partitions without a filter - String query = format("select date_col from dfs.`%s`", new Path(path, PARTITIONED_1_2_FOLDER)); + String query = format("select date_col from dfs.`%s`", new Path(getDfsTestTmpSchemaLocation(), PARTITIONED_1_2_FOLDER)); TestBuilder builder = testBuilder() .sqlQuery(query) .unOrdered() @@ -347,7 +332,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { @Test public void testReadOldMetadataCacheFileWithPruning() throws Exception { String query = format("select date_col from dfs.`%s` where date_col = date '1970-01-01'", - new Path(path, PARTITIONED_1_2_FOLDER)); + new Path(getDfsTestTmpSchemaLocation(), PARTITIONED_1_2_FOLDER)); // verify that pruning is actually taking place testPlanMatchingPatterns(query, new String[]{"numFiles=1", "usedMetadataFile=true"}, null); @@ -365,7 +350,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { // for sanity, try reading all partitions without a filter TestBuilder builder = testBuilder() .sqlQuery("select date_col from table(dfs.`%s` (type => 'parquet', autoCorrectCorruptDates => false))", - new Path(path, PARTITIONED_1_2_FOLDER)) + new Path(getDfsTestTmpSchemaLocation(), PARTITIONED_1_2_FOLDER)) .unOrdered() .baselineColumns("date_col"); addCorruptedDateBaselineValues(builder); @@ -373,7 +358,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { String query = format("select date_col from table(dfs.`%s` (type => 'parquet', " + "autoCorrectCorruptDates => false)) where date_col = cast('15334-03-17' as date)", - new Path(path, PARTITIONED_1_2_FOLDER)); + new Path(getDfsTestTmpSchemaLocation(), PARTITIONED_1_2_FOLDER)); // verify that pruning is actually taking place testPlanMatchingPatterns(query, new String[]{"numFiles=1", "usedMetadataFile=true"}, null); @@ -388,9 +373,10 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { @Test public void testReadNewMetadataCacheFileOverOldAndNewFiles() throws Exception { - String table = format("dfs.`%s`", new Path(path, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER)); - copyMetaDataCacheToTempReplacingInternalPaths("parquet/4203_corrupt_dates/" + - "mixed_version_partitioned_metadata.requires_replace.txt", MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER); + String table = format("dfs.`%s`", new Path(getDfsTestTmpSchemaLocation(), MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER)); + copyMetaDataCacheToTempReplacingInternalPaths( + "parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt", + MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER, Metadata.METADATA_FILENAME); // for sanity, try reading all partitions without a filter TestBuilder builder = testBuilder() .sqlQuery("select date_col from " + table) @@ -488,48 +474,4 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase { } } - private static String replaceWorkingPathInString(String orig) { - return orig.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath())); - } - - private static void copyDirectoryIntoTempSpace(String resourcesDir) throws IOException { - copyDirectoryIntoTempSpace(resourcesDir, null); - } - - private static void copyDirectoryIntoTempSpace(String resourcesDir, String destinationSubDir) throws IOException { - Path destination = path; - if (destinationSubDir != null) { - destination = new Path(path, destinationSubDir); - } - fs.copyFromLocalFile( - new Path(replaceWorkingPathInString(resourcesDir)), - destination); - } - - /** - * Metadata cache files include full paths to the files that have been scanned. - * - * There is no way to generate a metadata cache file with absolute paths that - * will be guaranteed to be available on an arbitrary test machine. - * - * To enable testing older metadata cache files, they were generated manually - * using older drill versions, and the absolute path up to the folder where - * the metadata cache file appeared was manually replaced with the string - * REPLACED_IN_TEST. Here the file is re-written into the given temporary - * location after the REPLACED_IN_TEST string has been replaced by the actual - * location generated during this run of the tests. - * - * @param srcFileOnClassPath - * @param destFolderInTmp - * @throws IOException - */ - private static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp) - throws IOException { - String metadataFileContents = getFile(srcFileOnClassPath); - Path newMetaCache = new Path(new Path(path, destFolderInTmp), ".drill.parquet_metadata"); - FSDataOutputStream outSteam = fs.create(newMetaCache); - outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", path.toString())); - outSteam.close(); - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java index e199ba5..dff2e86 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,12 +30,15 @@ import java.io.File; import java.nio.file.Files; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestParquetMetadataCache extends PlanTestBase { private static final String WORKING_PATH = TestTools.getWorkingPath(); private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; private static final String tableName1 = "parquetTable1"; private static final String tableName2 = "parquetTable2"; + private static final String RELATIVE_PATHS_METADATA = "relative_paths_metadata"; @BeforeClass @@ -398,10 +401,56 @@ public class TestParquetMetadataCache extends PlanTestBase { } + @Test // DRILL-3867 + public void testMoveCache() throws Exception { + final String tableName = "nation_move"; + final String newTableName = "nation_moved"; + try { + test("use dfs_test.tmp"); + test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName); + test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName); + test("refresh table metadata %s", tableName); + checkForMetadataFile(tableName); + File srcFile = new File(getDfsTestTmpSchemaLocation(), tableName); + File dstFile = new File(getDfsTestTmpSchemaLocation(), newTableName); + FileUtils.moveDirectory(srcFile, dstFile); + assertFalse("Cache file was not moved successfully", srcFile.exists()); + int rowCount = testSql(String.format("select * from %s", newTableName)); + assertEquals("An incorrect result was obtained while querying a table with metadata cache files", 50, rowCount); + } finally { + test("drop table if exists %s", newTableName); + } + } + + @Test + public void testMetadataCacheAbsolutePaths() throws Exception { + try { + test("use dfs_test.tmp"); + final String relative_path_metadata_t1 = RELATIVE_PATHS_METADATA + "/t1"; + final String relative_path_metadata_t2 = RELATIVE_PATHS_METADATA + "/t2"; + test("create table `%s` as select * from cp.`tpch/nation.parquet`", relative_path_metadata_t1); + test("create table `%s` as select * from cp.`tpch/nation.parquet`", relative_path_metadata_t2); + copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" + + "metadata_directories_with_absolute_paths.requires_replace.txt", RELATIVE_PATHS_METADATA, Metadata.METADATA_DIRECTORIES_FILENAME); + copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" + + "metadata_table_with_absolute_paths.requires_replace.txt", RELATIVE_PATHS_METADATA, Metadata.METADATA_FILENAME); + copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" + + "metadata_table_with_absolute_paths_t1.requires_replace.txt", relative_path_metadata_t1, Metadata.METADATA_FILENAME); + copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" + + "metadata_table_with_absolute_paths_t2.requires_replace.txt", relative_path_metadata_t2, Metadata.METADATA_FILENAME); + + int rowCount = testSql(String.format("select * from %s", RELATIVE_PATHS_METADATA)); + assertEquals("An incorrect result was obtained while querying a table with metadata cache files", 50, rowCount); + } finally { + test("drop table if exists %s", RELATIVE_PATHS_METADATA); + } + } + private void checkForMetadataFile(String table) throws Exception { String tmpDir = getDfsTestTmpSchemaLocation(); String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME); - Assert.assertTrue(Files.exists(new File(metaFile).toPath())); + assertTrue(String.format("There is no metadata cache file for the %s table", table), + Files.exists(new File(metaFile).toPath())); } } http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt new file mode 100644 index 0000000..887988d --- /dev/null +++ b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt @@ -0,0 +1,3 @@ +{ + "directories" : [ "file:REPLACED_IN_TEST/relative_paths_metadata/t1", "file:REPLACED_IN_TEST/relative_paths_metadata/t2" ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths.requires_replace.txt ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths.requires_replace.txt new file mode 100644 index 0000000..78a822e --- /dev/null +++ b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths.requires_replace.txt @@ -0,0 +1,108 @@ +{ + "metadata_version" : "v3", + "columnTypeInfo" : { + "n_name" : { + "name" : [ "n_name" ], + "primitiveType" : "BINARY", + "originalType" : "UTF8", + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_nationkey" : { + "name" : [ "n_nationkey" ], + "primitiveType" : "INT32", + "originalType" : null, + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_regionkey" : { + "name" : [ "n_regionkey" ], + "primitiveType" : "INT32", + "originalType" : null, + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_comment" : { + "name" : [ "n_comment" ], + "primitiveType" : "BINARY", + "originalType" : "UTF8", + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + } + }, + "files" : [ { + "path" : "REPLACED_IN_TEST/relative_paths_metadata/t1/0_0_0.parquet", + "length" : 2424, + "rowGroups" : [ { + "start" : 4, + "length" : 1802, + "rowCount" : 25, + "hostAffinity" : { + "localhost" : 1.0 + }, + "columns" : [ { + "name" : [ "n_nationkey" ], + "minValue" : 0, + "maxValue" : 24, + "nulls" : 0 + }, { + "name" : [ "n_name" ], + "minValue" : "ALGERIA", + "maxValue" : "VIETNAM", + "nulls" : 0 + }, { + "name" : [ "n_regionkey" ], + "minValue" : 0, + "maxValue" : 4, + "nulls" : 0 + }, { + "name" : [ "n_comment" ], + "minValue" : " haggle. carefully final deposits detect slyly agai", + "maxValue" : "y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be", + "nulls" : 0 + } ] + } ] + }, { + "path" : "REPLACED_IN_TEST/relative_paths_metadata/t2/0_0_0.parquet", + "length" : 2424, + "rowGroups" : [ { + "start" : 4, + "length" : 1802, + "rowCount" : 25, + "hostAffinity" : { + "localhost" : 1.0 + }, + "columns" : [ { + "name" : [ "n_nationkey" ], + "minValue" : 0, + "maxValue" : 24, + "nulls" : 0 + }, { + "name" : [ "n_name" ], + "minValue" : "ALGERIA", + "maxValue" : "VIETNAM", + "nulls" : 0 + }, { + "name" : [ "n_regionkey" ], + "minValue" : 0, + "maxValue" : 4, + "nulls" : 0 + }, { + "name" : [ "n_comment" ], + "minValue" : " haggle. carefully final deposits detect slyly agai", + "maxValue" : "y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be", + "nulls" : 0 + } ] + } ] + } ], + "directories" : [ "file:REPLACED_IN_TEST/relative_paths_metadata/t1", "file:REPLACED_IN_TEST/relative_paths_metadata/t2" ], + "drillVersion" : "1.11.0" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t1.requires_replace.txt ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t1.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t1.requires_replace.txt new file mode 100644 index 0000000..7ce10e7 --- /dev/null +++ b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t1.requires_replace.txt @@ -0,0 +1,76 @@ +{ + "metadata_version" : "v3", + "columnTypeInfo" : { + "n_name" : { + "name" : [ "n_name" ], + "primitiveType" : "BINARY", + "originalType" : "UTF8", + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_nationkey" : { + "name" : [ "n_nationkey" ], + "primitiveType" : "INT32", + "originalType" : null, + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_regionkey" : { + "name" : [ "n_regionkey" ], + "primitiveType" : "INT32", + "originalType" : null, + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_comment" : { + "name" : [ "n_comment" ], + "primitiveType" : "BINARY", + "originalType" : "UTF8", + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + } + }, + "files" : [ { + "path" : "REPLACED_IN_TEST/relative_paths_metadata/t1/0_0_0.parquet", + "length" : 2424, + "rowGroups" : [ { + "start" : 4, + "length" : 1802, + "rowCount" : 25, + "hostAffinity" : { + "localhost" : 1.0 + }, + "columns" : [ { + "name" : [ "n_nationkey" ], + "minValue" : 0, + "maxValue" : 24, + "nulls" : 0 + }, { + "name" : [ "n_name" ], + "minValue" : "ALGERIA", + "maxValue" : "VIETNAM", + "nulls" : 0 + }, { + "name" : [ "n_regionkey" ], + "minValue" : 0, + "maxValue" : 4, + "nulls" : 0 + }, { + "name" : [ "n_comment" ], + "minValue" : " haggle. carefully final deposits detect slyly agai", + "maxValue" : "y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be", + "nulls" : 0 + } ] + } ] + } ], + "directories" : [ ], + "drillVersion" : "1.11.0" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/964a9473/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t2.requires_replace.txt ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t2.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t2.requires_replace.txt new file mode 100644 index 0000000..c984f58 --- /dev/null +++ b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_table_with_absolute_paths_t2.requires_replace.txt @@ -0,0 +1,76 @@ +{ + "metadata_version" : "v3", + "columnTypeInfo" : { + "n_name" : { + "name" : [ "n_name" ], + "primitiveType" : "BINARY", + "originalType" : "UTF8", + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_nationkey" : { + "name" : [ "n_nationkey" ], + "primitiveType" : "INT32", + "originalType" : null, + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_regionkey" : { + "name" : [ "n_regionkey" ], + "primitiveType" : "INT32", + "originalType" : null, + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + }, + "n_comment" : { + "name" : [ "n_comment" ], + "primitiveType" : "BINARY", + "originalType" : "UTF8", + "precision" : 0, + "scale" : 0, + "repetitionLevel" : 0, + "definitionLevel" : 0 + } + }, + "files" : [ { + "path" : "REPLACED_IN_TEST/relative_paths_metadata/t2/0_0_0.parquet", + "length" : 2424, + "rowGroups" : [ { + "start" : 4, + "length" : 1802, + "rowCount" : 25, + "hostAffinity" : { + "localhost" : 1.0 + }, + "columns" : [ { + "name" : [ "n_nationkey" ], + "minValue" : 0, + "maxValue" : 24, + "nulls" : 0 + }, { + "name" : [ "n_name" ], + "minValue" : "ALGERIA", + "maxValue" : "VIETNAM", + "nulls" : 0 + }, { + "name" : [ "n_regionkey" ], + "minValue" : 0, + "maxValue" : 4, + "nulls" : 0 + }, { + "name" : [ "n_comment" ], + "minValue" : " haggle. carefully final deposits detect slyly agai", + "maxValue" : "y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be", + "nulls" : 0 + } ] + } ] + } ], + "directories" : [ ], + "drillVersion" : "1.11.0" +} \ No newline at end of file