[GOBBLIN-387] pick job files in FIFO order pick job files in FIFO order
delete temp file on exit dummy commit fix findBugsMain Closes #2264 from arjun4084346/jobOrder Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cd9447a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cd9447a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cd9447a5 Branch: refs/heads/0.12.0 Commit: cd9447a58c0bc66bc24d223803bd8f33dda31887 Parents: 94bcc16 Author: Arjun <[email protected]> Authored: Mon Feb 5 11:44:14 2018 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Feb 5 11:44:14 2018 -0800 ---------------------------------------------------------------------- .../org/apache/gobblin/util/PullFileLoader.java | 37 +++++++++++++++----- .../apache/gobblin/util/PullFileLoaderTest.java | 36 ++++++++++++++++++- 2 files changed, 64 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd9447a5/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java index a68f9ac..210615c 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java @@ -23,6 +23,8 @@ import java.io.InputStreamReader; import java.io.Reader; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Properties; import java.util.Set; @@ -74,7 +76,7 @@ public class PullFileLoader { public static final String PROPERTY_DELIMITER_PARSING_ENABLED_KEY = "property.parsing.enablekey"; public static final boolean DEFAULT_PROPERTY_DELIMITER_PARSING_ENABLED_KEY = false; - + private final Path rootDirectory; private final FileSystem fs; private final ExtensionFilter javaPropsPullFileFilter; @@ -145,28 +147,36 @@ public class PullFileLoader { } /** - * Find and load all pull files under a base {@link Path} recursively. + * Find and load all pull files under a base {@link Path} recursively in an order sorted by last modified date. * @param path base {@link Path} where pull files should be found recursively. * @param sysProps A {@link Config} used as fallback. * @param loadGlobalProperties if true, will also load at most one *.properties file per directory from the * {@link #rootDirectory} to the pull file {@link Path} for each pull file. * @return The loaded {@link Config}s. */ - public Collection<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) { + public List<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) { try { Config fallback = sysProps; if (loadGlobalProperties && PathUtils.isAncestor(this.rootDirectory, path.getParent())) { fallback = loadAncestorGlobalConfigs(path.getParent(), fallback); } - return loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties); + return getSortedConfigs(loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties)); } catch (IOException ioe) { return Lists.newArrayList(); } } - private Collection<Config> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) { - List<Config> pullFiles = Lists.newArrayList(); + private List<Config> getSortedConfigs(List<ConfigWithTimeStamp> configsWithTimeStamps) { + List<Config> sortedConfigs = Lists.newArrayList(); + Collections.sort(configsWithTimeStamps, (config1, config2) -> (config1.timeStamp > config2.timeStamp) ? 1 : -1); + for (ConfigWithTimeStamp configWithTimeStamp : configsWithTimeStamps) { + sortedConfigs.add(configWithTimeStamp.config); + } + return sortedConfigs; + } + private List<ConfigWithTimeStamp> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) { + List<ConfigWithTimeStamp> pullFiles = Lists.newArrayList(); try { if (loadGlobalProperties) { fallback = findAndLoadGlobalConfigInDirectory(path, fallback); @@ -183,9 +193,11 @@ public class PullFileLoader { if (status.isDirectory()) { pullFiles.addAll(loadPullFilesRecursivelyHelper(status.getPath(), fallback, loadGlobalProperties)); } else if (this.javaPropsPullFileFilter.accept(status.getPath())) { - pullFiles.add(loadJavaPropsWithFallback(status.getPath(), fallback).resolve()); + log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime()); + pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadJavaPropsWithFallback(status.getPath(), fallback).resolve())); } else if (this.hoconPullFileFilter.accept(status.getPath())) { - pullFiles.add(loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve()); + log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime()); + pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve())); } } catch (IOException ioe) { // Failed to load specific subpath, try with the other subpaths in this directory @@ -298,4 +310,13 @@ public class PullFileLoader { } } + private static class ConfigWithTimeStamp { + long timeStamp; + Config config; + + public ConfigWithTimeStamp(long timeStamp, Config config) { + this.timeStamp = timeStamp; + this.config = config; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd9447a5/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java index 2560206..1c8bff8 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java @@ -17,8 +17,11 @@ package org.apache.gobblin.util; +import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.util.Collection; +import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -28,6 +31,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -139,6 +143,36 @@ public class PullFileLoaderTest { Assert.assertEquals(pullFile.entrySet().size(), 4); } + /** + * Tests to verify job written first to the job catalog is picked up first. + * @throws Exception + */ + @Test void testJobLoadingOrder() throws Exception { + Properties sysProps = new Properties(); + FileSystem fs = FileSystem.getLocal(new Configuration()); + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + Path localBasePath = new Path(tmpDir.getAbsolutePath(), "PullFileLoaderTestDir"); + fs.mkdirs(localBasePath); + + for (int i=5; i>0; i--) { + String job = localBasePath.toString() + "/job" + i + ".conf"; + PrintWriter writer = new PrintWriter(job, "UTF-8"); + writer.println("key=job" + i + "_val"); + writer.close(); + Thread.sleep(1000); + } + + List<Config> configs = + loader.loadPullFilesRecursively(localBasePath, ConfigUtils.propertiesToConfig(sysProps), false); + + int i = 5; + for (Config config : configs) { + Assert.assertEquals(config.getString("key"), "job" + i + "_val"); + i--; + } + } + @Test public void testJobLoadingWithSysPropsAndGlobalProps() throws Exception { Path path; @@ -230,7 +264,7 @@ public class PullFileLoaderTest { pullFile = loader.loadPullFile(path, cfg, false); Assert.assertEquals(pullFile.getString("json.property.key"), pullFile.getString("json.property.key1")); } - + private Config pullFileFromPath(Collection<Config> configs, Path path) throws IOException { for (Config config : configs) { if (config.getString(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY).equals(path.toString())) {
