OOZIE-2729 OYA: Use MiniYARNCluster in tests. TODO: refactor XTestCase. Change-Id: I520655abf645625a44cd7df88e435686fe04fe00
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5dcc5ce Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5dcc5ce Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5dcc5ce Branch: refs/heads/oya Commit: d5dcc5cec2e080413e2540f43d3877b4d56f99ad Parents: 782837f Author: Andras Piros <[email protected]> Authored: Thu Nov 17 12:33:24 2016 +0100 Committer: Andras Piros <[email protected]> Committed: Thu Nov 17 12:33:24 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/oozie/test/XTestCase.java | 430 +++++++++---------- 1 file changed, 215 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d5dcc5ce/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index fd6d4ad..ca3f883 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -27,14 +27,9 @@ import java.io.OutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.URL; -import java.util.ArrayList; -import java.util.EnumSet; +import java.util.*; import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -55,13 +50,14 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.spi.LoggingEvent; @@ -147,24 +143,24 @@ public abstract class XTestCase extends TestCase { OOZIE_SRC_DIR = OOZIE_SRC_DIR.getParentFile(); } - String testPropsFile = System.getProperty(OOZIE_TEST_PROPERTIES, "test.properties"); - File file = new File(testPropsFile).isAbsolute() - ? new File(testPropsFile) : new File(OOZIE_SRC_DIR, testPropsFile); + final String testPropsFile = System.getProperty(OOZIE_TEST_PROPERTIES, "test.properties"); + final File file = new File(testPropsFile).isAbsolute() + ? new File(testPropsFile) : new File(OOZIE_SRC_DIR, testPropsFile); if (file.exists()) { System.out.println(); System.out.println("*********************************************************************************"); System.out.println("Loading test system properties from: " + file.getAbsolutePath()); System.out.println(); - Properties props = new Properties(); + final Properties props = new Properties(); props.load(new FileReader(file)); - for (Map.Entry entry : props.entrySet()) { + for (final Map.Entry entry : props.entrySet()) { if (!System.getProperties().containsKey(entry.getKey())) { System.setProperty((String) entry.getKey(), (String) entry.getValue()); System.out.println(entry.getKey() + " = " + entry.getValue()); } else { System.out.println(entry.getKey() + " IGNORED, using command line value = " + - System.getProperty((String) entry.getKey())); + System.getProperty((String) entry.getKey())); } } System.out.println("*********************************************************************************"); @@ -173,14 +169,13 @@ public abstract class XTestCase extends TestCase { else { if (System.getProperty(OOZIE_TEST_PROPERTIES) != null) { System.err.println(); - System.err.println("ERROR: Specified test file does not exist: " + - System.getProperty(OOZIE_TEST_PROPERTIES)); + System.err.println("ERROR: Specified test file does not exist: " + + System.getProperty(OOZIE_TEST_PROPERTIES)); System.err.println(); System.exit(-1); } } - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } @@ -261,12 +256,12 @@ public abstract class XTestCase extends TestCase { /** * Name of the shell command */ - protected static final String SHELL_COMMAND_NAME = (Shell.WINDOWS)? "cmd": "bash"; + protected static final String SHELL_COMMAND_NAME = (Shell.WINDOWS) ? "cmd" : "bash"; /** * Extension for shell script files */ - protected static final String SHELL_COMMAND_SCRIPTFILE_EXTENSION = (Shell.WINDOWS)? "cmd": "sh"; + protected static final String SHELL_COMMAND_SCRIPTFILE_EXTENSION = (Shell.WINDOWS) ? "cmd" : "sh"; /** * Option for shell command to pass script files @@ -297,12 +292,12 @@ public abstract class XTestCase extends TestCase { * @param cleanUpDBTables true if should cleanup the database tables, false if not * @throws Exception if the test workflow working directory could not be created or there was a problem cleaning the database */ - protected void setUp(boolean cleanUpDBTables) throws Exception { + protected void setUp(final boolean cleanUpDBTables) throws Exception { RUNNING_TESTCASES.incrementAndGet(); super.setUp(); - String baseDir = System.getProperty(OOZIE_TEST_DIR, new File("target/test-data").getAbsolutePath()); + final String baseDir = System.getProperty(OOZIE_TEST_DIR, new File("target/test-data").getAbsolutePath()); String msg = null; - File f = new File(baseDir); + final File f = new File(baseDir); if (!f.isAbsolute()) { msg = XLog.format("System property [{0}]=[{1}] must be set to an absolute path", OOZIE_TEST_DIR, baseDir); } @@ -319,7 +314,7 @@ public abstract class XTestCase extends TestCase { f.mkdirs(); if (!f.exists() || !f.isDirectory()) { System.err.println(); - System.err.println(XLog.format("Could not create test dir [{0}]", baseDir)); + System.err.println(XLog.format("Could not create test dir [{0}]", baseDir)); System.exit(-1); } hadoopVersion = System.getProperty(HADOOP_VERSION, "0.20.0"); @@ -331,12 +326,12 @@ public abstract class XTestCase extends TestCase { testCaseConfDir = createTestCaseSubDir("conf"); // load test Oozie site - String oozieTestDB = System.getProperty("oozie.test.db", "hsqldb"); - String defaultOozieSize = - new File(OOZIE_SRC_DIR, "core/src/test/resources/" + oozieTestDB + "-oozie-site.xml").getAbsolutePath(); - String customOozieSite = System.getProperty("oozie.test.config.file", defaultOozieSize); + final String oozieTestDB = System.getProperty("oozie.test.db", "hsqldb"); + final String defaultOozieSize = + new File(OOZIE_SRC_DIR, "core/src/test/resources/" + oozieTestDB + "-oozie-site.xml").getAbsolutePath(); + final String customOozieSite = System.getProperty("oozie.test.config.file", defaultOozieSize); File source = new File(customOozieSite); - if(!source.isAbsolute()) { + if (!source.isAbsolute()) { source = new File(OOZIE_SRC_DIR, customOozieSite); } source = source.getAbsoluteFile(); @@ -346,7 +341,7 @@ public abstract class XTestCase extends TestCase { } else { // If we can't find it, try using the class loader (useful if we're using XTestCase from outside core) - URL sourceURL = getClass().getClassLoader().getResource(oozieTestDB + "-oozie-site.xml"); + final URL sourceURL = getClass().getClassLoader().getResource(oozieTestDB + "-oozie-site.xml"); if (sourceURL != null) { oozieSiteSourceStream = sourceURL.openStream(); } @@ -354,35 +349,35 @@ public abstract class XTestCase extends TestCase { // If we still can't find it, then exit System.err.println(); System.err.println(XLog.format("Custom configuration file for testing does not exist [{0}]", - source.getAbsolutePath())); + source.getAbsolutePath())); System.err.println(); System.exit(-1); } } // Copy the specified oozie-site file from oozieSiteSourceStream to the test case dir as oozie-site.xml - Configuration oozieSiteConf = new Configuration(false); + final Configuration oozieSiteConf = new Configuration(false); oozieSiteConf.addResource(oozieSiteSourceStream); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream(ConfigurationService.DEFAULT_CONFIG_FILE); - XConfiguration configuration = new XConfiguration(inputStream); - String classes = configuration.get(Services.CONF_SERVICE_CLASSES); + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final InputStream inputStream = classLoader.getResourceAsStream(ConfigurationService.DEFAULT_CONFIG_FILE); + final XConfiguration configuration = new XConfiguration(inputStream); + final String classes = configuration.get(Services.CONF_SERVICE_CLASSES); // Disable sharelib service as it cannot find the sharelib jars // as maven has target/classes in classpath and not the jar because test phase is before package phase - oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,","")); + oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,", "")); // Make sure to create the Oozie DB during unit tests oozieSiteConf.set(JPAService.CONF_CREATE_DB_SCHEMA, "true"); File target = new File(testCaseConfDir, "oozie-site.xml"); oozieSiteConf.writeXml(new FileOutputStream(target)); - File hadoopConfDir = new File(testCaseConfDir, "hadoop-conf"); + final File hadoopConfDir = new File(testCaseConfDir, "hadoop-conf"); hadoopConfDir.mkdir(); - File actionConfDir = new File(testCaseConfDir, "action-conf"); + final File actionConfDir = new File(testCaseConfDir, "action-conf"); actionConfDir.mkdir(); source = new File(OOZIE_SRC_DIR, "core/src/test/resources/hadoop-config.xml"); InputStream hadoopConfigResourceStream = null; if (!source.exists()) { // If we can't find it, try using the class loader (useful if we're using XTestCase from outside core) - URL sourceURL = getClass().getClassLoader().getResource("hadoop-config.xml"); + final URL sourceURL = getClass().getClassLoader().getResource("hadoop-config.xml"); if (sourceURL != null) { hadoopConfigResourceStream = sourceURL.openStream(); } @@ -415,17 +410,17 @@ public abstract class XTestCase extends TestCase { } if (System.getProperty("oozie.test.db.host") == null) { - System.setProperty("oozie.test.db.host", "localhost"); + System.setProperty("oozie.test.db.host", "localhost"); } setSystemProperty(ConfigurationService.OOZIE_DATA_DIR, testCaseDir); - setSystemProperty(HadoopAccessorService.SUPPORTED_FILESYSTEMS,"*"); + setSystemProperty(HadoopAccessorService.SUPPORTED_FILESYSTEMS, "*"); - if (mrCluster != null) { - OutputStream os = new FileOutputStream(new File(hadoopConfDir, "core-site.xml")); - Configuration conf = createJobConfFromMRCluster(); - conf.writeXml(os); - os.close(); + if (yarnCluster != null) { + try (final OutputStream os = new FileOutputStream(new File(hadoopConfDir, "core-site.xml"))) { + final Configuration conf = createJobConfFromYarnCluster(); + conf.writeXml(os); + } } if (System.getProperty("oozie.test.metastore.server", "false").equals("true")) { @@ -474,12 +469,12 @@ public abstract class XTestCase extends TestCase { * reason for the manual parsing instead of an actual File.toURI is because Oozie tests use tokens ${} * frequently. Something like URI("c:/temp/${HOUR}").toString() will generate escaped values that will break tests */ - protected String getTestCaseFileUri(String relativeUri) { + protected String getTestCaseFileUri(final String relativeUri) { String uri = new File(testCaseDir).toURI().toString(); // truncates '/' if the testCaseDir was provided with a fullpath ended with separator - if (uri.endsWith("/")){ - uri = uri.substring(0, uri.length() -1); + if (uri.endsWith("/")) { + uri = uri.substring(0, uri.length() - 1); } return uri + "/" + relativeUri; @@ -518,7 +513,7 @@ public abstract class XTestCase extends TestCase { /** * Return an alternate test user Id that belongs - to the test group. + to the test group. * * @return the user Id. */ @@ -562,7 +557,7 @@ public abstract class XTestCase extends TestCase { * @param testCase testcase instance to obtain the working directory. * @return the test working directory. */ - private String getTestCaseDirInternal(TestCase testCase) { + private String getTestCaseDirInternal(final TestCase testCase) { ParamChecker.notNull(testCase, "testCase"); File dir = new File(System.getProperty(OOZIE_TEST_DIR, "target/test-data")); dir = new File(dir, "oozietests").getAbsoluteFile(); @@ -571,16 +566,16 @@ public abstract class XTestCase extends TestCase { return dir.getAbsolutePath(); } - protected void delete(File file) throws IOException { + protected void delete(final File file) throws IOException { ParamChecker.notNull(file, "file"); if (file.getAbsolutePath().length() < 5) { throw new RuntimeException(XLog.format("path [{0}] is too short, not deleting", file.getAbsolutePath())); } if (file.exists()) { if (file.isDirectory()) { - File[] children = file.listFiles(); + final File[] children = file.listFiles(); if (children != null) { - for (File child : children) { + for (final File child : children) { delete(child); } } @@ -604,14 +599,14 @@ public abstract class XTestCase extends TestCase { * @return return the path of the test working directory, it is always an absolute path. * @throws Exception if the test working directory could not be created or cleaned up. */ - private String createTestCaseDir(TestCase testCase, boolean cleanup) throws Exception { - String testCaseDir = getTestCaseDirInternal(testCase); + private String createTestCaseDir(final TestCase testCase, final boolean cleanup) throws Exception { + final String testCaseDir = getTestCaseDirInternal(testCase); System.out.println(); System.out.println(XLog.format("Setting testcase work dir[{0}]", testCaseDir)); if (cleanup) { delete(new File(testCaseDir)); } - File dir = new File(testCaseDir); + final File dir = new File(testCaseDir); if (!dir.mkdirs()) { throw new RuntimeException(XLog.format("Could not create testcase dir[{0}]", testCaseDir)); } @@ -624,7 +619,7 @@ public abstract class XTestCase extends TestCase { * @param subDirNames a list of progressively deeper directory names * @return the absolute path to the created directory. */ - protected String createTestCaseSubDir(String... subDirNames) { + protected String createTestCaseSubDir(final String... subDirNames) { ParamChecker.notNull(subDirNames, "subDirName"); if (subDirNames.length == 0) { throw new RuntimeException(XLog.format("Could not create testcase subdir ''; it already exists")); @@ -650,12 +645,12 @@ public abstract class XTestCase extends TestCase { * @param name system property name. * @param value value to set. */ - protected void setSystemProperty(String name, String value) { + protected void setSystemProperty(final String name, final String value) { if (sysProps == null) { sysProps = new HashMap<String, String>(); } if (!sysProps.containsKey(name)) { - String currentValue = System.getProperty(name); + final String currentValue = System.getProperty(name); sysProps.put(name, currentValue); } if (value != null) { @@ -671,7 +666,7 @@ public abstract class XTestCase extends TestCase { */ private void resetSystemProperties() { if (sysProps != null) { - for (Map.Entry<String, String> entry : sysProps.entrySet()) { + for (final Map.Entry<String, String> entry : sysProps.entrySet()) { if (entry.getValue() != null) { System.setProperty(entry.getKey(), entry.getValue()); } @@ -704,11 +699,11 @@ public abstract class XTestCase extends TestCase { * @param predicate predicate waiting on. * @return the waited time. */ - protected long waitFor(int timeout, Predicate predicate) { + protected long waitFor(final int timeout, final Predicate predicate) { ParamChecker.notNull(predicate, "predicate"); - XLog log = new XLog(LogFactory.getLog(getClass())); - long started = System.currentTimeMillis(); - long mustEnd = System.currentTimeMillis() + (long)(WAITFOR_RATIO * timeout); + final XLog log = new XLog(LogFactory.getLog(getClass())); + final long started = System.currentTimeMillis(); + final long mustEnd = System.currentTimeMillis() + (long) (WAITFOR_RATIO * timeout); long lastEcho = 0; try { long waiting = mustEnd - System.currentTimeMillis(); @@ -726,8 +721,7 @@ public abstract class XTestCase extends TestCase { log.info("Waiting timed out after [{0}] msec", timeout); } return System.currentTimeMillis() - started; - } - catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException(ex); } } @@ -737,7 +731,7 @@ public abstract class XTestCase extends TestCase { * * @param sleepTime time in milliseconds to wait */ - protected void sleep(int sleepTime) { + protected void sleep(final int sleepTime) { waitFor(sleepTime, new Predicate() { @Override public boolean evaluate() throws Exception { @@ -777,7 +771,7 @@ public abstract class XTestCase extends TestCase { } public String getKeytabFile() { - String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath(); + final String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath(); return System.getProperty("oozie.test.kerberos.keytab.file", defaultFile); } @@ -787,7 +781,7 @@ public abstract class XTestCase extends TestCase { public String getOoziePrincipal() { return System.getProperty("oozie.test.kerberos.oozie.principal", - getOozieUser() + "/localhost") + "@" + getRealm(); + getOozieUser() + "/localhost") + "@" + getRealm(); } protected MiniHCatServer getHCatalogServer() { @@ -811,12 +805,11 @@ public abstract class XTestCase extends TestCase { // needed to cleanup the database and shut them down when done; the test will likely start its own Services later and // we don't want to interfere try { - Services services = new Services(); + final Services services = new Services(); services.getConf().set(Services.CONF_SERVICE_CLASSES, MINIMAL_SERVICES_FOR_DB_CLEANUP); services.init(); cleanUpDBTablesInternal(); - } - finally { + } finally { if (Services.get() != null) { Services.get().destroy(); } @@ -825,70 +818,70 @@ public abstract class XTestCase extends TestCase { } private void cleanUpDBTablesInternal() throws StoreException { - EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager(); + final EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager(); entityManager.setFlushMode(FlushModeType.COMMIT); entityManager.getTransaction().begin(); Query q = entityManager.createNamedQuery("GET_WORKFLOWS"); - List<WorkflowJobBean> wfjBeans = q.getResultList(); - int wfjSize = wfjBeans.size(); - for (WorkflowJobBean w : wfjBeans) { + final List<WorkflowJobBean> wfjBeans = q.getResultList(); + final int wfjSize = wfjBeans.size(); + for (final WorkflowJobBean w : wfjBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_ACTIONS"); - List<WorkflowActionBean> wfaBeans = q.getResultList(); - int wfaSize = wfaBeans.size(); - for (WorkflowActionBean w : wfaBeans) { + final List<WorkflowActionBean> wfaBeans = q.getResultList(); + final int wfaSize = wfaBeans.size(); + for (final WorkflowActionBean w : wfaBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_COORD_JOBS"); - List<CoordinatorJobBean> cojBeans = q.getResultList(); - int cojSize = cojBeans.size(); - for (CoordinatorJobBean w : cojBeans) { + final List<CoordinatorJobBean> cojBeans = q.getResultList(); + final int cojSize = cojBeans.size(); + for (final CoordinatorJobBean w : cojBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_COORD_ACTIONS"); - List<CoordinatorActionBean> coaBeans = q.getResultList(); - int coaSize = coaBeans.size(); - for (CoordinatorActionBean w : coaBeans) { + final List<CoordinatorActionBean> coaBeans = q.getResultList(); + final int coaSize = coaBeans.size(); + for (final CoordinatorActionBean w : coaBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_BUNDLE_JOBS"); - List<BundleJobBean> bjBeans = q.getResultList(); - int bjSize = bjBeans.size(); - for (BundleJobBean w : bjBeans) { + final List<BundleJobBean> bjBeans = q.getResultList(); + final int bjSize = bjBeans.size(); + for (final BundleJobBean w : bjBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_BUNDLE_ACTIONS"); - List<BundleActionBean> baBeans = q.getResultList(); - int baSize = baBeans.size(); - for (BundleActionBean w : baBeans) { + final List<BundleActionBean> baBeans = q.getResultList(); + final int baSize = baBeans.size(); + for (final BundleActionBean w : baBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_SLA_EVENTS"); - List<SLAEventBean> slaBeans = q.getResultList(); - int slaSize = slaBeans.size(); - for (SLAEventBean w : slaBeans) { + final List<SLAEventBean> slaBeans = q.getResultList(); + final int slaSize = slaBeans.size(); + for (final SLAEventBean w : slaBeans) { entityManager.remove(w); } q = entityManager.createQuery("select OBJECT(w) from SLARegistrationBean w"); - List<SLARegistrationBean> slaRegBeans = q.getResultList(); - int slaRegSize = slaRegBeans.size(); - for (SLARegistrationBean w : slaRegBeans) { + final List<SLARegistrationBean> slaRegBeans = q.getResultList(); + final int slaRegSize = slaRegBeans.size(); + for (final SLARegistrationBean w : slaRegBeans) { entityManager.remove(w); } q = entityManager.createQuery("select OBJECT(w) from SLASummaryBean w"); - List<SLASummaryBean> sdBeans = q.getResultList(); - int ssSize = sdBeans.size(); - for (SLASummaryBean w : sdBeans) { + final List<SLASummaryBean> sdBeans = q.getResultList(); + final int ssSize = sdBeans.size(); + for (final SLASummaryBean w : sdBeans) { entityManager.remove(w); } @@ -908,58 +901,49 @@ public abstract class XTestCase extends TestCase { private static MiniDFSCluster dfsCluster = null; private static MiniDFSCluster dfsCluster2 = null; - // TODO: OYA: replace with MiniYarnCluster or MiniMRYarnCluster - private static MiniMRCluster mrCluster = null; + private static MiniYARNCluster yarnCluster = null; private static MiniHCatServer hcatServer = null; private static MiniHS2 hiveserver2 = null; private static HiveConf hs2Config = null; - private void setUpEmbeddedHadoop(String testCaseDir) throws Exception { - if (dfsCluster == null && mrCluster == null) { - if (System.getProperty("hadoop.log.dir") == null) { - System.setProperty("hadoop.log.dir", testCaseDir); - } + private void setUpEmbeddedHadoop(final String testCaseDir) throws Exception { + if (dfsCluster == null && yarnCluster == null) { + if (System.getProperty("hadoop.log.dir") == null) { + System.setProperty("hadoop.log.dir", testCaseDir); + } // Tell the ClasspathUtils that we're using a mini cluster ClasspathUtils.setUsingMiniYarnCluster(true); - int taskTrackers = 2; - int dataNodes = 2; - String oozieUser = getOozieUser(); - JobConf conf = createDFSConfig(); - String[] userGroups = new String[] { getTestGroup(), getTestGroup2() }; + final int dataNodes = 2; + final String oozieUser = getOozieUser(); + final JobConf dfsConfig = createDFSConfig(); + final String[] userGroups = new String[]{getTestGroup(), getTestGroup2()}; UserGroupInformation.createUserForTesting(oozieUser, userGroups); UserGroupInformation.createUserForTesting(getTestUser(), userGroups); UserGroupInformation.createUserForTesting(getTestUser2(), userGroups); - UserGroupInformation.createUserForTesting(getTestUser3(), new String[] { "users" } ); + UserGroupInformation.createUserForTesting(getTestUser3(), new String[]{"users"}); try { - dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null); - FileSystem fileSystem = dfsCluster.getFileSystem(); - fileSystem.mkdirs(new Path("target/test-data")); - fileSystem.mkdirs(new Path("target/test-data"+"/minicluster/mapred")); - fileSystem.mkdirs(new Path("/user")); - fileSystem.mkdirs(new Path("/tmp")); - fileSystem.mkdirs(new Path("/hadoop/mapred/system")); - fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("target/test-data"+"/minicluster"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("target/test-data"+"/minicluster/mapred"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------")); - String nnURI = fileSystem.getUri().toString(); - int numDirs = 1; - String[] racks = null; - String[] hosts = null; - mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf); - JobConf jobConf = mrCluster.createJobConf(); + dfsCluster = new MiniDFSCluster.Builder(dfsConfig) + .numDataNodes(dataNodes) + .format(true) + .racks(null) + .build(); + + createHdfsPathsAndSetupPermissions(); + + final Configuration yarnConfig = createYarnConfig(dfsConfig); + yarnCluster = new MiniYARNCluster(this.getClass().getName(), 1, 1, 1, 1); + yarnCluster.init(yarnConfig); + yarnCluster.start(); + final JobConf jobConf = new JobConf(yarnCluster.getConfig()); System.setProperty(OOZIE_TEST_JOB_TRACKER, jobConf.get("mapred.job.tracker")); - String rmAddress = jobConf.get("yarn.resourcemanager.address"); + final String rmAddress = jobConf.get("yarn.resourcemanager.address"); if (rmAddress != null) { System.setProperty(OOZIE_TEST_JOB_TRACKER, rmAddress); } - System.setProperty(OOZIE_TEST_NAME_NODE, jobConf.get("fs.default.name")); - ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - } - catch (Exception ex) { + System.setProperty(OOZIE_TEST_NAME_NODE, dfsCluster.getFileSystem().getUri().toString()); + ProxyUsers.refreshSuperUserGroupsConfiguration(dfsConfig); + } catch (final Exception ex) { shutdownMiniCluster(); throw ex; } @@ -967,15 +951,32 @@ public abstract class XTestCase extends TestCase { } } + private void createHdfsPathsAndSetupPermissions() throws IOException { + final FileSystem fileSystem = dfsCluster.getFileSystem(); + + fileSystem.mkdirs(new Path("target/test-data")); + fileSystem.mkdirs(new Path("target/test-data" + "/minicluster/mapred")); + fileSystem.mkdirs(new Path("/user")); + fileSystem.mkdirs(new Path("/tmp")); + fileSystem.mkdirs(new Path("/hadoop/mapred/system")); + + fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("target/test-data" + "/minicluster"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("target/test-data" + "/minicluster/mapred"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------")); + } + private void setUpEmbeddedHadoop2() throws Exception { if (dfsCluster != null && dfsCluster2 == null) { // Trick dfs location for MiniDFSCluster since it doesn't accept location as input) - String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data"); + final String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data"); try { System.setProperty("test.build.data", FilenameUtils.concat(testBuildDataSaved, "2")); // Only DFS cluster is created based upon current need dfsCluster2 = new MiniDFSCluster(createDFSConfig(), 2, true, null); - FileSystem fileSystem = dfsCluster2.getFileSystem(); + final FileSystem fileSystem = dfsCluster2.getFileSystem(); fileSystem.mkdirs(new Path("target/test-data")); fileSystem.mkdirs(new Path("/user")); fileSystem.mkdirs(new Path("/tmp")); @@ -983,12 +984,10 @@ public abstract class XTestCase extends TestCase { fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); System.setProperty(OOZIE_TEST_NAME_NODE2, fileSystem.getConf().get("fs.default.name")); - } - catch (Exception ex) { + } catch (final Exception ex) { shutdownMiniCluster2(); throw ex; - } - finally { + } finally { // Restore previus value System.setProperty("test.build.data", testBuildDataSaved); } @@ -996,31 +995,41 @@ public abstract class XTestCase extends TestCase { } private JobConf createDFSConfig() throws UnknownHostException { - JobConf conf = new JobConf(); - conf.set("dfs.block.access.token.enable", "false"); - conf.set("dfs.permissions", "true"); - conf.set("hadoop.security.authentication", "simple"); - - //Doing this because Hadoop 1.x does not support '*' if the value is '*,127.0.0.1' - StringBuilder sb = new StringBuilder(); - sb.append("127.0.0.1,localhost"); - for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { - sb.append(",").append(i.getCanonicalHostName()); - } - conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString()); - - conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup()); - conf.set("mapred.tasktracker.map.tasks.maximum", "4"); - conf.set("mapred.tasktracker.reduce.tasks.maximum", "4"); - - conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster"); - - // Scheduler properties required for YARN CapacityScheduler to work - conf.set("yarn.scheduler.capacity.root.queues", "default"); - conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); - // Required to prevent deadlocks with YARN CapacityScheduler - conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5"); - return conf; + final JobConf conf = new JobConf(); + conf.set("dfs.block.access.token.enable", "false"); + conf.set("dfs.permissions", "true"); + conf.set("hadoop.security.authentication", "simple"); + + //Doing this because Hadoop 1.x does not support '*' if the value is '*,127.0.0.1' + final StringBuilder sb = new StringBuilder(); + sb.append("127.0.0.1,localhost"); + for (final InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { + sb.append(",").append(i.getCanonicalHostName()); + } + conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString()); + + conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup()); + conf.set("mapred.tasktracker.map.tasks.maximum", "4"); + conf.set("mapred.tasktracker.reduce.tasks.maximum", "4"); + + conf.set("hadoop.tmp.dir", "target/test-data" + "/minicluster"); + + // Scheduler properties required for YARN CapacityScheduler to work + conf.set("yarn.scheduler.capacity.root.queues", "default"); + conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); + // Required to prevent deadlocks with YARN CapacityScheduler + conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5"); + + return conf; + } + + private Configuration createYarnConfig(final Configuration parentConfig) { + final Configuration yarnConfig = new YarnConfiguration(parentConfig); + + yarnConfig.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + yarnConfig.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + + return yarnConfig; } protected void setupHCatalogServer() throws Exception { @@ -1047,8 +1056,8 @@ public abstract class XTestCase extends TestCase { if (hs2Config == null) { // Make HS2 use our Mini cluster by copying all configs to HiveConf; also had to hack MiniHS2 hs2Config = new HiveConf(); - Configuration jobConf = createJobConf(); - for (Map.Entry<String, String> pair : jobConf) { + final Configuration jobConf = createJobConf(); + for (final Map.Entry<String, String> pair : jobConf) { hs2Config.set(pair.getKey(), pair.getValue()); } } @@ -1070,25 +1079,23 @@ public abstract class XTestCase extends TestCase { return hiveserver2.getJdbcURL(); } - protected String getHiveServer2JdbcURL(String dbName) { + protected String getHiveServer2JdbcURL(final String dbName) { return hiveserver2.getJdbcURL(dbName); } private static void shutdownMiniCluster() { try { - if (mrCluster != null) { - mrCluster.shutdown(); + if (yarnCluster != null) { + yarnCluster.stop(); } - } - catch (Exception ex) { + } catch (final Exception ex) { System.out.println(ex); } try { if (dfsCluster != null) { dfsCluster.shutdown(); } - } - catch (Exception ex) { + } catch (final Exception ex) { System.out.println(ex); } // This is tied to the MiniCluster because it inherits configs from there @@ -1100,11 +1107,11 @@ public abstract class XTestCase extends TestCase { if (dfsCluster2 != null) { dfsCluster2.shutdown(); } - } - catch (Exception ex) { + } catch (final Exception ex) { System.out.println(ex); } } + private static final AtomicLong LAST_TESTCASE_FINISHED = new AtomicLong(); private static final AtomicInteger RUNNING_TESTCASES = new AtomicInteger(); @@ -1115,7 +1122,7 @@ public abstract class XTestCase extends TestCase { } public void run() { - long shutdownWait = Long.parseLong(System.getProperty(TEST_MINICLUSTER_MONITOR_SHUTDOWN_WAIT, "10")) * 1000; + final long shutdownWait = Long.parseLong(System.getProperty(TEST_MINICLUSTER_MONITOR_SHUTDOWN_WAIT, "10")) * 1000; LAST_TESTCASE_FINISHED.set(System.currentTimeMillis()); while (true) { if (RUNNING_TESTCASES.get() == 0) { @@ -1125,8 +1132,7 @@ public abstract class XTestCase extends TestCase { } try { Thread.sleep(1000); - } - catch (InterruptedException ex) { + } catch (final InterruptedException ex) { break; } } @@ -1136,10 +1142,10 @@ public abstract class XTestCase extends TestCase { } @SuppressWarnings("deprecation") - private JobConf createJobConfFromMRCluster() { - JobConf jobConf = new JobConf(); - JobConf jobConfMR = mrCluster.createJobConf(); - for ( Entry<String, String> entry : jobConfMR) { + private JobConf createJobConfFromYarnCluster() { + final JobConf jobConf = new JobConf(); + final JobConf jobConfYarn = new JobConf(yarnCluster.getConfig()); + for (final Entry<String, String> entry : jobConfYarn) { // MiniMRClientClusterFactory sets the job jar in Hadoop 2.0 causing tests to fail // TODO call conf.unset after moving completely to Hadoop 2.x if (!(entry.getKey().equals("mapreduce.job.jar") || entry.getKey().equals("mapred.jar"))) { @@ -1154,15 +1160,16 @@ public abstract class XTestCase extends TestCase { * @return a jobconf preconfigured to talk with the test cluster/minicluster. */ protected JobConf createJobConf() throws IOException { - JobConf jobConf; - if (mrCluster != null) { - jobConf = createJobConfFromMRCluster(); - } - else { + final JobConf jobConf; + + if (yarnCluster != null) { + jobConf = createJobConfFromYarnCluster(); + } else { jobConf = new JobConf(); jobConf.set("mapred.job.tracker", getJobTrackerUri()); jobConf.set("fs.default.name", getNameNodeUri()); } + return jobConf; } @@ -1185,29 +1192,22 @@ public abstract class XTestCase extends TestCase { * * @param executable The ShutdownJobTrackerExecutable to execute while the JobTracker is shutdown */ - protected void executeWhileJobTrackerIsShutdown(ShutdownJobTrackerExecutable executable) { - mrCluster.stopJobTracker(); - Exception ex = null; + protected void executeWhileJobTrackerIsShutdown(final ShutdownJobTrackerExecutable executable) { try { executable.execute(); - } catch (Exception e) { - ex = e; - } finally { - mrCluster.startJobTracker(); - } - if (ex != null) { - throw new RuntimeException(ex); + } catch (final Exception e) { + throw new RuntimeException(e); } } protected Services setupServicesForHCatalog() throws ServiceException { - Services services = new Services(); + final Services services = new Services(); setupServicesForHCataLogImpl(services); return services; } - private void setupServicesForHCataLogImpl(Services services) { - Configuration conf = services.getConf(); + private void setupServicesForHCataLogImpl(final Services services) { + final Configuration conf = services.getConf(); conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + "," + PartitionDependencyManagerService.class.getName() + "," + @@ -1215,31 +1215,31 @@ public abstract class XTestCase extends TestCase { conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#" + localActiveMQBroker + - "connectionFactoryNames#"+ "ConnectionFactory"); + "connectionFactoryNames#" + "ConnectionFactory"); conf.set(URIHandlerService.URI_HANDLERS, FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName()); setSystemProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); setSystemProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); } - protected Services setupServicesForHCatalog(Services services) throws ServiceException { + protected Services setupServicesForHCatalog(final Services services) throws ServiceException { setupServicesForHCataLogImpl(services); return services; } - protected YarnApplicationState waitUntilYarnAppState(String externalId, final EnumSet<YarnApplicationState> acceptedStates) + protected YarnApplicationState waitUntilYarnAppState(final String externalId, final EnumSet<YarnApplicationState> acceptedStates) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>(); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); + final JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf); try { waitFor(60 * 1000, new Predicate() { @Override public boolean evaluate() throws Exception { - YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState(); + final YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState(); finalState.setValue(state); return acceptedStates.contains(state); @@ -1255,20 +1255,20 @@ public abstract class XTestCase extends TestCase { return finalState.getValue(); } - protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { - YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); + protected void waitUntilYarnAppDoneAndAssertSuccess(final String externalId) throws HadoopAccessorException, IOException, YarnException { + final YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); assertEquals("YARN App state", YarnApplicationState.FINISHED, state); } - protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { - YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); + protected void waitUntilYarnAppKilledAndAssertSuccess(final String externalId) throws HadoopAccessorException, IOException, YarnException { + final YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); assertEquals("YARN App state", YarnApplicationState.KILLED, state); } protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); YarnApplicationState state = null; - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); + final JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); // This is needed here because we need a mutable final YarnClient final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null); try {
