Repository: oozie Updated Branches: refs/heads/master e229e4dbb -> 6dedac608
OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6dedac60 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6dedac60 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6dedac60 Branch: refs/heads/master Commit: 6dedac6082c1cc3c4012a3c279632329a8b30482 Parents: e229e4d Author: Andras Piros <[email protected]> Authored: Thu Aug 9 13:05:54 2018 +0300 Committer: Andras Piros <[email protected]> Committed: Thu Aug 9 13:05:54 2018 +0300 ---------------------------------------------------------------------- .../apache/oozie/service/ShareLibService.java | 18 +- .../oozie/service/TestHAShareLibService.java | 2 +- .../oozie/service/TestShareLibService.java | 166 ++++++++++++++++++- release-log.txt | 1 + 4 files changed, 174 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/main/java/org/apache/oozie/service/ShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java index a901567..b88dab3 100644 --- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java +++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java @@ -85,8 +85,6 @@ public class ShareLibService implements Service, Instrumentable { public static final String SHARE_LIB_PREFIX = "lib_"; - public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); - private Services services; private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>(); @@ -119,6 +117,14 @@ public class ShareLibService implements Service, Instrumentable { final long retentionTime = 1000L * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION); + @VisibleForTesting + protected static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMddHHmmss"); + } + }; + @Override public void init(Services services) throws ServiceException { this.services = services; @@ -519,7 +525,7 @@ public class ShareLibService implements Service, Instrumentable { String time = name.substring(prefix.length()); Date d = null; try { - d = dateFormat.parse(time); + d = dt.get().parse(time); } catch (ParseException e) { return false; @@ -707,7 +713,7 @@ public class ShareLibService implements Service, Instrumentable { * @return the launcherlib path */ private Path getLauncherlibPath() { - String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime()); + String formattedDate = dt.get().format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime()); Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX + formattedDate); return tmpLauncherLibPath; @@ -733,11 +739,11 @@ public class ShareLibService implements Service, Instrumentable { FileStatus[] files = fs.listStatus(rootDir, directoryFilter); for (FileStatus file : files) { - String name = file.getPath().getName().toString(); + String name = file.getPath().getName(); String time = name.substring(prefix.length()); Date d = null; try { - d = dateFormat.parse(time); + d = dt.get().parse(time); } catch (ParseException e) { continue; http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java index 5087505..144d0c5 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java @@ -69,7 +69,7 @@ public class TestHAShareLibService extends ZKXTestCase { Date time = new Date(System.currentTimeMillis()); Path basePath = new Path(Services.get().getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/test/java/org/apache/oozie/service/TestShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java index d244166..95dab5c 100644 --- a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java +++ b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java @@ -24,21 +24,33 @@ import java.io.PrintWriter; import java.net.URI; import java.net.URLDecoder; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import com.google.common.collect.Lists; import com.google.common.io.Files; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context; @@ -53,9 +65,10 @@ import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; +import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.Lists; +import org.mockito.Mockito; public class TestShareLibService extends XFsTestCase { private static final String HDFS_SCHEME_PREFIX = "hdfs"; @@ -210,7 +223,7 @@ public class TestShareLibService extends XFsTestCase { Path basePath = new Path(getOozieConfig() .get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); @@ -381,7 +394,7 @@ public class TestShareLibService extends XFsTestCase { // Use timedstamped directory if available Date time = new Date(System.currentTimeMillis()); - Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); @@ -483,7 +496,7 @@ public class TestShareLibService extends XFsTestCase { Path basePath = new Path(getOozieConfig() .get(WorkflowAppService.SYSTEM_LIB_PATH)); Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX - + ShareLibService.dateFormat.format(time)); + + ShareLibService.dt.get().format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); @@ -545,7 +558,7 @@ public class TestShareLibService extends XFsTestCase { Path basePath = new Path(getOozieConfig() .get(WorkflowAppService.SYSTEM_LIB_PATH)); Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX - + ShareLibService.dateFormat.format(time)); + + ShareLibService.dt.get().format(time)); fs.mkdirs(libpath); Path ooziePath = new Path(libpath.toString() + Path.SEPARATOR + "oozie"); fs.mkdirs(ooziePath); @@ -630,7 +643,7 @@ public class TestShareLibService extends XFsTestCase { // Use timedstamped directory if available Date time = new Date(System.currentTimeMillis()); - Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time)); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); createDirs(fs, pigPath, new Path(pigPath, "temp")); @@ -1010,6 +1023,147 @@ public class TestShareLibService extends XFsTestCase { } } + @Test + public void testParsingALotOfShareLibsParallel() throws ServiceException, IOException { + setShipLauncherInOozieConfig(); + services.init(); + // destroying, as we dont want the sharelib dirs purge to be scheduled + services.get(SchedulerService.class).destroy(); + + final List<FileStatus> fileStatuses = new ArrayList<>(); + + final Path rootDir = Mockito.mock(Path.class); + final FileSystem fs = Mockito.mock(FileSystem.class); + + final int NUMBER_OF_FILESTATUSES = 100; + + for (int i = 0; i < NUMBER_OF_FILESTATUSES; ++i) { + createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1); + } + + final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]); + Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses); + + final ShareLibService shareLibService = services.get(ShareLibService.class); + shareLibService.fs = fs; + + runGivenCallableOnThreads(() -> { + try { + shareLibService.getLatestLibPath(rootDir, "lib_"); + } catch (final IOException | NumberFormatException e) { + log.error(e.getMessage()); + Thread.currentThread().interrupt(); + return false; + } + return true; + }, 10, 10); + } + + @Test + public void testDeterminingLatestSharelibPathOn1Thread() throws IOException, ServiceException { + testDeterminingLatestSharelibPath(1); + } + + @Test + public void testDeterminingLatestSharelibPathOn5Threads() throws IOException, ServiceException { + testDeterminingLatestSharelibPath(5); + } + + @Test + public void testDeterminingLatestSharelibPathOn10Threads() throws IOException, ServiceException { + testDeterminingLatestSharelibPath(10); + } + + private void testDeterminingLatestSharelibPath(final int numberOfThreads) throws ServiceException, IOException { + setShipLauncherInOozieConfig(); + services.init(); + // destroying, as we dont want the sharelib dirs purge to be scheduled + services.get(SchedulerService.class).destroy(); + + final List<FileStatus> fileStatuses = new ArrayList<>(); + createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1); + createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 1, 1); + final Path filePath3 = createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 1, 0, 1); + + final Path rootDir = Mockito.mock(Path.class); + final FileSystem fs = Mockito.mock(FileSystem.class); + + final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]); + Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses); + + final ShareLibService shareLibService = services.get(ShareLibService.class); + shareLibService.fs = fs; + + runGivenCallableOnThreads(() -> { + try { + final Path path = shareLibService.getLatestLibPath(rootDir, "lib_"); + Assert.assertEquals(filePath3, path); + } catch (final IOException | NumberFormatException e) { + log.error(e.getMessage()); + Thread.currentThread().interrupt(); + return false; + } + return true; + }, 100, numberOfThreads); + } + + private void runGivenCallableOnThreads( + final Callable<Boolean> callable, final int numberOfCallables, final int numberOfThreads) { + final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); + final List<Callable<Boolean>> callableTasks = new ArrayList<>(); + + for (int i = 0; i < numberOfCallables; ++i) { + callableTasks.add(callable); + } + + // Start 10 thread to do parallel time parsing. Issue is experienced with old code. + List<Future<Boolean>> futures = new ArrayList<>(); + try { + futures = executor.invokeAll(callableTasks); + } catch (final InterruptedException e) { + log.error(e.getMessage()); + Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage()); + } + + // Shut down the executor service to have all tasks finished, then collect the results + awaitTerminationAfterShutdown(executor); + + Assert.assertFalse(futures.isEmpty()); + for (final Future<Boolean> f : futures) { + try { + final Boolean result = f.get(5, TimeUnit.SECONDS); + Assert.assertTrue("Parsed share lib name shall be a valid timestamp", result); + } catch (final InterruptedException | ExecutionException | TimeoutException e) { + log.error(e.getMessage()); + Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage()); + } + } + } + + private Path createAndAddMockedFileStatus(final List<FileStatus> fileStatuses, int y, int m, int d, int h, int min, int s) { + final String date = new SimpleDateFormat("yyyyMMddHHmmss").format( + new Calendar.Builder().setDate(y, m, d).setTimeOfDay(h, min, s).build().getTime()); + final Path filePath = Mockito.mock(Path.class); + final String libName = ShareLibService.SHARE_LIB_PREFIX + date; + Mockito.when(filePath.getName()).thenReturn(libName); + final FileStatus fileStatus = Mockito.mock(FileStatus.class); + Mockito.when(fileStatus.getPath()).thenReturn(filePath); + fileStatuses.add(fileStatus); + return filePath; + } + + private void awaitTerminationAfterShutdown(ExecutorService threadPool) { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + } catch (InterruptedException ex) { + threadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + private void setupSharelibConf(final String file, final String tag) throws ServiceException, IOException { Properties prop = new Properties(); prop.put(tag, TEST_HDFS_HOME + SHARELIB_PATH); http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 310f78a..5708d1e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros) OOZIE-3314 Remove findbugs-filter.xml and convert its contents to annotations (asalamon74 via andras.piros) OOZIE-3321 PySpark example fails (daniel.becker via andras.piros) OOZIE-3315 DateList example fails (daniel.becker via andras.piros)
