Revert "OOZIE-2729 OYA: Use MiniYARNCluster in tests. TODO: refactor XTestCase."
This reverts commit d5dcc5cec2e080413e2540f43d3877b4d56f99ad. Change-Id: Iefe037a8477591a7554b31fe81a399d7e1f86f00 Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/67dca9c3 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/67dca9c3 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/67dca9c3 Branch: refs/heads/oya Commit: 67dca9c31016a3bf7ad00037f1750fce988f1e76 Parents: e5070b1 Author: Peter Bacsko <[email protected]> Authored: Mon Nov 28 14:04:13 2016 +0100 Committer: Peter Bacsko <[email protected]> Committed: Mon Nov 28 14:04:13 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/oozie/test/XTestCase.java | 440 +++++++++---------- 1 file changed, 217 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/67dca9c3/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 784c578..711d41d 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -27,9 +27,14 @@ import java.io.OutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.URL; -import java.util.*; +import java.util.ArrayList; +import java.util.EnumSet; import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -50,12 +55,12 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.AppenderSkeleton; @@ -142,24 +147,24 @@ public abstract class XTestCase extends TestCase { OOZIE_SRC_DIR = OOZIE_SRC_DIR.getParentFile(); } - final String testPropsFile = System.getProperty(OOZIE_TEST_PROPERTIES, "test.properties"); - final File file = new File(testPropsFile).isAbsolute() - ? new File(testPropsFile) : new File(OOZIE_SRC_DIR, testPropsFile); + String testPropsFile = System.getProperty(OOZIE_TEST_PROPERTIES, "test.properties"); + File file = new File(testPropsFile).isAbsolute() + ? new File(testPropsFile) : new File(OOZIE_SRC_DIR, testPropsFile); if (file.exists()) { System.out.println(); System.out.println("*********************************************************************************"); System.out.println("Loading test system properties from: " + file.getAbsolutePath()); System.out.println(); - final Properties props = new Properties(); + Properties props = new Properties(); props.load(new FileReader(file)); - for (final Map.Entry entry : props.entrySet()) { + for (Map.Entry entry : props.entrySet()) { if (!System.getProperties().containsKey(entry.getKey())) { System.setProperty((String) entry.getKey(), (String) entry.getValue()); System.out.println(entry.getKey() + " = " + entry.getValue()); } else { System.out.println(entry.getKey() + " IGNORED, using command line value = " + - System.getProperty((String) entry.getKey())); + System.getProperty((String) entry.getKey())); } } System.out.println("*********************************************************************************"); @@ -168,13 +173,14 @@ public abstract class XTestCase extends TestCase { else { if (System.getProperty(OOZIE_TEST_PROPERTIES) != null) { System.err.println(); - System.err.println("ERROR: Specified test file does not exist: " + - System.getProperty(OOZIE_TEST_PROPERTIES)); + System.err.println("ERROR: Specified test file does not exist: " + + System.getProperty(OOZIE_TEST_PROPERTIES)); System.err.println(); System.exit(-1); } } - } catch (final IOException ex) { + } + catch (IOException ex) { throw new RuntimeException(ex); } @@ -255,12 +261,12 @@ public abstract class XTestCase extends TestCase { /** * Name of the shell command */ - protected static final String SHELL_COMMAND_NAME = (Shell.WINDOWS) ? "cmd" : "bash"; + protected static final String SHELL_COMMAND_NAME = (Shell.WINDOWS)? "cmd": "bash"; /** * Extension for shell script files */ - protected static final String SHELL_COMMAND_SCRIPTFILE_EXTENSION = (Shell.WINDOWS) ? "cmd" : "sh"; + protected static final String SHELL_COMMAND_SCRIPTFILE_EXTENSION = (Shell.WINDOWS)? "cmd": "sh"; /** * Option for shell command to pass script files @@ -291,12 +297,12 @@ public abstract class XTestCase extends TestCase { * @param cleanUpDBTables true if should cleanup the database tables, false if not * @throws Exception if the test workflow working directory could not be created or there was a problem cleaning the database */ - protected void setUp(final boolean cleanUpDBTables) throws Exception { + protected void setUp(boolean cleanUpDBTables) throws Exception { RUNNING_TESTCASES.incrementAndGet(); super.setUp(); - final String baseDir = System.getProperty(OOZIE_TEST_DIR, new File("target/test-data").getAbsolutePath()); + String baseDir = System.getProperty(OOZIE_TEST_DIR, new File("target/test-data").getAbsolutePath()); String msg = null; - final File f = new File(baseDir); + File f = new File(baseDir); if (!f.isAbsolute()) { msg = XLog.format("System property [{0}]=[{1}] must be set to an absolute path", OOZIE_TEST_DIR, baseDir); } @@ -313,7 +319,7 @@ public abstract class XTestCase extends TestCase { f.mkdirs(); if (!f.exists() || !f.isDirectory()) { System.err.println(); - System.err.println(XLog.format("Could not create test dir [{0}]", baseDir)); + System.err.println(XLog.format("Could not create test dir [{0}]", baseDir)); System.exit(-1); } hadoopVersion = System.getProperty(HADOOP_VERSION, "0.20.0"); @@ -325,12 +331,12 @@ public abstract class XTestCase extends TestCase { testCaseConfDir = createTestCaseSubDir("conf"); // load test Oozie site - final String oozieTestDB = System.getProperty("oozie.test.db", "hsqldb"); - final String defaultOozieSize = - new File(OOZIE_SRC_DIR, "core/src/test/resources/" + oozieTestDB + "-oozie-site.xml").getAbsolutePath(); - final String customOozieSite = System.getProperty("oozie.test.config.file", defaultOozieSize); + String oozieTestDB = System.getProperty("oozie.test.db", "hsqldb"); + String defaultOozieSize = + new File(OOZIE_SRC_DIR, "core/src/test/resources/" + oozieTestDB + "-oozie-site.xml").getAbsolutePath(); + String customOozieSite = System.getProperty("oozie.test.config.file", defaultOozieSize); File source = new File(customOozieSite); - if (!source.isAbsolute()) { + if(!source.isAbsolute()) { source = new File(OOZIE_SRC_DIR, customOozieSite); } source = source.getAbsoluteFile(); @@ -340,7 +346,7 @@ public abstract class XTestCase extends TestCase { } else { // If we can't find it, try using the class loader (useful if we're using XTestCase from outside core) - final URL sourceURL = getClass().getClassLoader().getResource(oozieTestDB + "-oozie-site.xml"); + URL sourceURL = getClass().getClassLoader().getResource(oozieTestDB + "-oozie-site.xml"); if (sourceURL != null) { oozieSiteSourceStream = sourceURL.openStream(); } @@ -348,35 +354,35 @@ public abstract class XTestCase extends TestCase { // If we still can't find it, then exit System.err.println(); System.err.println(XLog.format("Custom configuration file for testing does not exist [{0}]", - source.getAbsolutePath())); + source.getAbsolutePath())); System.err.println(); System.exit(-1); } } // Copy the specified oozie-site file from oozieSiteSourceStream to the test case dir as oozie-site.xml - final Configuration oozieSiteConf = new Configuration(false); + Configuration oozieSiteConf = new Configuration(false); oozieSiteConf.addResource(oozieSiteSourceStream); - final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final InputStream inputStream = classLoader.getResourceAsStream(ConfigurationService.DEFAULT_CONFIG_FILE); - final XConfiguration configuration = new XConfiguration(inputStream); - final String classes = configuration.get(Services.CONF_SERVICE_CLASSES); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream(ConfigurationService.DEFAULT_CONFIG_FILE); + XConfiguration configuration = new XConfiguration(inputStream); + String classes = configuration.get(Services.CONF_SERVICE_CLASSES); // Disable sharelib service as it cannot find the sharelib jars // as maven has target/classes in classpath and not the jar because test phase is before package phase - oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,", "")); + oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,","")); // Make sure to create the Oozie DB during unit tests oozieSiteConf.set(JPAService.CONF_CREATE_DB_SCHEMA, "true"); File target = new File(testCaseConfDir, "oozie-site.xml"); oozieSiteConf.writeXml(new FileOutputStream(target)); - final File hadoopConfDir = new File(testCaseConfDir, "hadoop-conf"); + File hadoopConfDir = new File(testCaseConfDir, "hadoop-conf"); hadoopConfDir.mkdir(); - final File actionConfDir = new File(testCaseConfDir, "action-conf"); + File actionConfDir = new File(testCaseConfDir, "action-conf"); actionConfDir.mkdir(); source = new File(OOZIE_SRC_DIR, "core/src/test/resources/hadoop-config.xml"); InputStream hadoopConfigResourceStream = null; if (!source.exists()) { // If we can't find it, try using the class loader (useful if we're using XTestCase from outside core) - final URL sourceURL = getClass().getClassLoader().getResource("hadoop-config.xml"); + URL sourceURL = getClass().getClassLoader().getResource("hadoop-config.xml"); if (sourceURL != null) { hadoopConfigResourceStream = sourceURL.openStream(); } @@ -409,17 +415,17 @@ public abstract class XTestCase extends TestCase { } if (System.getProperty("oozie.test.db.host") == null) { - System.setProperty("oozie.test.db.host", "localhost"); + System.setProperty("oozie.test.db.host", "localhost"); } setSystemProperty(ConfigurationService.OOZIE_DATA_DIR, testCaseDir); - setSystemProperty(HadoopAccessorService.SUPPORTED_FILESYSTEMS, "*"); + setSystemProperty(HadoopAccessorService.SUPPORTED_FILESYSTEMS,"*"); - if (yarnCluster != null) { - try (final OutputStream os = new FileOutputStream(new File(hadoopConfDir, "core-site.xml"))) { - final Configuration conf = createJobConfFromYarnCluster(); - conf.writeXml(os); - } + if (mrCluster != null) { + OutputStream os = new FileOutputStream(new File(hadoopConfDir, "core-site.xml")); + Configuration conf = createJobConfFromMRCluster(); + conf.writeXml(os); + os.close(); } if (System.getProperty("oozie.test.metastore.server", "false").equals("true")) { @@ -468,12 +474,12 @@ public abstract class XTestCase extends TestCase { * reason for the manual parsing instead of an actual File.toURI is because Oozie tests use tokens ${} * frequently. Something like URI("c:/temp/${HOUR}").toString() will generate escaped values that will break tests */ - protected String getTestCaseFileUri(final String relativeUri) { + protected String getTestCaseFileUri(String relativeUri) { String uri = new File(testCaseDir).toURI().toString(); // truncates '/' if the testCaseDir was provided with a fullpath ended with separator - if (uri.endsWith("/")) { - uri = uri.substring(0, uri.length() - 1); + if (uri.endsWith("/")){ + uri = uri.substring(0, uri.length() -1); } return uri + "/" + relativeUri; @@ -512,7 +518,7 @@ public abstract class XTestCase extends TestCase { /** * Return an alternate test user Id that belongs - to the test group. + to the test group. * * @return the user Id. */ @@ -556,7 +562,7 @@ public abstract class XTestCase extends TestCase { * @param testCase testcase instance to obtain the working directory. * @return the test working directory. */ - private String getTestCaseDirInternal(final TestCase testCase) { + private String getTestCaseDirInternal(TestCase testCase) { ParamChecker.notNull(testCase, "testCase"); File dir = new File(System.getProperty(OOZIE_TEST_DIR, "target/test-data")); dir = new File(dir, "oozietests").getAbsoluteFile(); @@ -565,16 +571,16 @@ public abstract class XTestCase extends TestCase { return dir.getAbsolutePath(); } - protected void delete(final File file) throws IOException { + protected void delete(File file) throws IOException { ParamChecker.notNull(file, "file"); if (file.getAbsolutePath().length() < 5) { throw new RuntimeException(XLog.format("path [{0}] is too short, not deleting", file.getAbsolutePath())); } if (file.exists()) { if (file.isDirectory()) { - final File[] children = file.listFiles(); + File[] children = file.listFiles(); if (children != null) { - for (final File child : children) { + for (File child : children) { delete(child); } } @@ -598,14 +604,14 @@ public abstract class XTestCase extends TestCase { * @return return the path of the test working directory, it is always an absolute path. * @throws Exception if the test working directory could not be created or cleaned up. */ - private String createTestCaseDir(final TestCase testCase, final boolean cleanup) throws Exception { - final String testCaseDir = getTestCaseDirInternal(testCase); + private String createTestCaseDir(TestCase testCase, boolean cleanup) throws Exception { + String testCaseDir = getTestCaseDirInternal(testCase); System.out.println(); System.out.println(XLog.format("Setting testcase work dir[{0}]", testCaseDir)); if (cleanup) { delete(new File(testCaseDir)); } - final File dir = new File(testCaseDir); + File dir = new File(testCaseDir); if (!dir.mkdirs()) { throw new RuntimeException(XLog.format("Could not create testcase dir[{0}]", testCaseDir)); } @@ -618,7 +624,7 @@ public abstract class XTestCase extends TestCase { * @param subDirNames a list of progressively deeper directory names * @return the absolute path to the created directory. */ - protected String createTestCaseSubDir(final String... subDirNames) { + protected String createTestCaseSubDir(String... subDirNames) { ParamChecker.notNull(subDirNames, "subDirName"); if (subDirNames.length == 0) { throw new RuntimeException(XLog.format("Could not create testcase subdir ''; it already exists")); @@ -644,12 +650,12 @@ public abstract class XTestCase extends TestCase { * @param name system property name. * @param value value to set. */ - protected void setSystemProperty(final String name, final String value) { + protected void setSystemProperty(String name, String value) { if (sysProps == null) { sysProps = new HashMap<String, String>(); } if (!sysProps.containsKey(name)) { - final String currentValue = System.getProperty(name); + String currentValue = System.getProperty(name); sysProps.put(name, currentValue); } if (value != null) { @@ -665,7 +671,7 @@ public abstract class XTestCase extends TestCase { */ private void resetSystemProperties() { if (sysProps != null) { - for (final Map.Entry<String, String> entry : sysProps.entrySet()) { + for (Map.Entry<String, String> entry : sysProps.entrySet()) { if (entry.getValue() != null) { System.setProperty(entry.getKey(), entry.getValue()); } @@ -698,11 +704,11 @@ public abstract class XTestCase extends TestCase { * @param predicate predicate waiting on. * @return the waited time. */ - protected long waitFor(final int timeout, final Predicate predicate) { + protected long waitFor(int timeout, Predicate predicate) { ParamChecker.notNull(predicate, "predicate"); - final XLog log = new XLog(LogFactory.getLog(getClass())); - final long started = System.currentTimeMillis(); - final long mustEnd = System.currentTimeMillis() + (long) (WAITFOR_RATIO * timeout); + XLog log = new XLog(LogFactory.getLog(getClass())); + long started = System.currentTimeMillis(); + long mustEnd = System.currentTimeMillis() + (long)(WAITFOR_RATIO * timeout); long lastEcho = 0; try { long waiting = mustEnd - System.currentTimeMillis(); @@ -720,7 +726,8 @@ public abstract class XTestCase extends TestCase { log.info("Waiting timed out after [{0}] msec", timeout); } return System.currentTimeMillis() - started; - } catch (final Exception ex) { + } + catch (Exception ex) { throw new RuntimeException(ex); } } @@ -730,7 +737,7 @@ public abstract class XTestCase extends TestCase { * * @param sleepTime time in milliseconds to wait */ - protected void sleep(final int sleepTime) { + protected void sleep(int sleepTime) { waitFor(sleepTime, new Predicate() { @Override public boolean evaluate() throws Exception { @@ -770,7 +777,7 @@ public abstract class XTestCase extends TestCase { } public String getKeytabFile() { - final String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath(); + String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath(); return System.getProperty("oozie.test.kerberos.keytab.file", defaultFile); } @@ -780,7 +787,7 @@ public abstract class XTestCase extends TestCase { public String getOoziePrincipal() { return System.getProperty("oozie.test.kerberos.oozie.principal", - getOozieUser() + "/localhost") + "@" + getRealm(); + getOozieUser() + "/localhost") + "@" + getRealm(); } protected MiniHCatServer getHCatalogServer() { @@ -804,11 +811,12 @@ public abstract class XTestCase extends TestCase { // needed to cleanup the database and shut them down when done; the test will likely start its own Services later and // we don't want to interfere try { - final Services services = new Services(); + Services services = new Services(); services.getConf().set(Services.CONF_SERVICE_CLASSES, MINIMAL_SERVICES_FOR_DB_CLEANUP); services.init(); cleanUpDBTablesInternal(); - } finally { + } + finally { if (Services.get() != null) { Services.get().destroy(); } @@ -817,70 +825,70 @@ public abstract class XTestCase extends TestCase { } private void cleanUpDBTablesInternal() throws StoreException { - final EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager(); + EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager(); entityManager.setFlushMode(FlushModeType.COMMIT); entityManager.getTransaction().begin(); Query q = entityManager.createNamedQuery("GET_WORKFLOWS"); - final List<WorkflowJobBean> wfjBeans = q.getResultList(); - final int wfjSize = wfjBeans.size(); - for (final WorkflowJobBean w : wfjBeans) { + List<WorkflowJobBean> wfjBeans = q.getResultList(); + int wfjSize = wfjBeans.size(); + for (WorkflowJobBean w : wfjBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_ACTIONS"); - final List<WorkflowActionBean> wfaBeans = q.getResultList(); - final int wfaSize = wfaBeans.size(); - for (final WorkflowActionBean w : wfaBeans) { + List<WorkflowActionBean> wfaBeans = q.getResultList(); + int wfaSize = wfaBeans.size(); + for (WorkflowActionBean w : wfaBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_COORD_JOBS"); - final List<CoordinatorJobBean> cojBeans = q.getResultList(); - final int cojSize = cojBeans.size(); - for (final CoordinatorJobBean w : cojBeans) { + List<CoordinatorJobBean> cojBeans = q.getResultList(); + int cojSize = cojBeans.size(); + for (CoordinatorJobBean w : cojBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_COORD_ACTIONS"); - final List<CoordinatorActionBean> coaBeans = q.getResultList(); - final int coaSize = coaBeans.size(); - for (final CoordinatorActionBean w : coaBeans) { + List<CoordinatorActionBean> coaBeans = q.getResultList(); + int coaSize = coaBeans.size(); + for (CoordinatorActionBean w : coaBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_BUNDLE_JOBS"); - final List<BundleJobBean> bjBeans = q.getResultList(); - final int bjSize = bjBeans.size(); - for (final BundleJobBean w : bjBeans) { + List<BundleJobBean> bjBeans = q.getResultList(); + int bjSize = bjBeans.size(); + for (BundleJobBean w : bjBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_BUNDLE_ACTIONS"); - final List<BundleActionBean> baBeans = q.getResultList(); - final int baSize = baBeans.size(); - for (final BundleActionBean w : baBeans) { + List<BundleActionBean> baBeans = q.getResultList(); + int baSize = baBeans.size(); + for (BundleActionBean w : baBeans) { entityManager.remove(w); } q = entityManager.createNamedQuery("GET_SLA_EVENTS"); - final List<SLAEventBean> slaBeans = q.getResultList(); - final int slaSize = slaBeans.size(); - for (final SLAEventBean w : slaBeans) { + List<SLAEventBean> slaBeans = q.getResultList(); + int slaSize = slaBeans.size(); + for (SLAEventBean w : slaBeans) { entityManager.remove(w); } q = entityManager.createQuery("select OBJECT(w) from SLARegistrationBean w"); - final List<SLARegistrationBean> slaRegBeans = q.getResultList(); - final int slaRegSize = slaRegBeans.size(); - for (final SLARegistrationBean w : slaRegBeans) { + List<SLARegistrationBean> slaRegBeans = q.getResultList(); + int slaRegSize = slaRegBeans.size(); + for (SLARegistrationBean w : slaRegBeans) { entityManager.remove(w); } q = entityManager.createQuery("select OBJECT(w) from SLASummaryBean w"); - final List<SLASummaryBean> sdBeans = q.getResultList(); - final int ssSize = sdBeans.size(); - for (final SLASummaryBean w : sdBeans) { + List<SLASummaryBean> sdBeans = q.getResultList(); + int ssSize = sdBeans.size(); + for (SLASummaryBean w : sdBeans) { entityManager.remove(w); } @@ -900,49 +908,58 @@ public abstract class XTestCase extends TestCase { private static MiniDFSCluster dfsCluster = null; private static MiniDFSCluster dfsCluster2 = null; - private static MiniYARNCluster yarnCluster = null; + // TODO: OYA: replace with MiniYarnCluster or MiniMRYarnCluster + private static MiniMRCluster mrCluster = null; private static MiniHCatServer hcatServer = null; private static MiniHS2 hiveserver2 = null; private static HiveConf hs2Config = null; - private void setUpEmbeddedHadoop(final String testCaseDir) throws Exception { - if (dfsCluster == null && yarnCluster == null) { - if (System.getProperty("hadoop.log.dir") == null) { - System.setProperty("hadoop.log.dir", testCaseDir); - } + private void setUpEmbeddedHadoop(String testCaseDir) throws Exception { + if (dfsCluster == null && mrCluster == null) { + if (System.getProperty("hadoop.log.dir") == null) { + System.setProperty("hadoop.log.dir", testCaseDir); + } // Tell the ClasspathUtils that we're using a mini cluster ClasspathUtils.setUsingMiniYarnCluster(true); - final int dataNodes = 2; - final String oozieUser = getOozieUser(); - final JobConf dfsConfig = createDFSConfig(); - final String[] userGroups = new String[]{getTestGroup(), getTestGroup2()}; + int taskTrackers = 2; + int dataNodes = 2; + String oozieUser = getOozieUser(); + JobConf conf = createDFSConfig(); + String[] userGroups = new String[] { getTestGroup(), getTestGroup2() }; UserGroupInformation.createUserForTesting(oozieUser, userGroups); UserGroupInformation.createUserForTesting(getTestUser(), userGroups); UserGroupInformation.createUserForTesting(getTestUser2(), userGroups); - UserGroupInformation.createUserForTesting(getTestUser3(), new String[]{"users"}); + UserGroupInformation.createUserForTesting(getTestUser3(), new String[] { "users" } ); try { - dfsCluster = new MiniDFSCluster.Builder(dfsConfig) - .numDataNodes(dataNodes) - .format(true) - .racks(null) - .build(); - - createHdfsPathsAndSetupPermissions(); - - final Configuration yarnConfig = createYarnConfig(dfsConfig); - yarnCluster = new MiniYARNCluster(this.getClass().getName(), 1, 1, 1, 1); - yarnCluster.init(yarnConfig); - yarnCluster.start(); - final JobConf jobConf = new JobConf(yarnCluster.getConfig()); + dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null); + FileSystem fileSystem = dfsCluster.getFileSystem(); + fileSystem.mkdirs(new Path("target/test-data")); + fileSystem.mkdirs(new Path("target/test-data"+"/minicluster/mapred")); + fileSystem.mkdirs(new Path("/user")); + fileSystem.mkdirs(new Path("/tmp")); + fileSystem.mkdirs(new Path("/hadoop/mapred/system")); + fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("target/test-data"+"/minicluster"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("target/test-data"+"/minicluster/mapred"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); + fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------")); + String nnURI = fileSystem.getUri().toString(); + int numDirs = 1; + String[] racks = null; + String[] hosts = null; + mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf); + JobConf jobConf = mrCluster.createJobConf(); System.setProperty(OOZIE_TEST_JOB_TRACKER, jobConf.get("mapred.job.tracker")); - final String rmAddress = jobConf.get("yarn.resourcemanager.address"); + String rmAddress = jobConf.get("yarn.resourcemanager.address"); if (rmAddress != null) { System.setProperty(OOZIE_TEST_JOB_TRACKER, rmAddress); } - System.setProperty(OOZIE_TEST_NAME_NODE, dfsCluster.getFileSystem().getUri().toString()); - ProxyUsers.refreshSuperUserGroupsConfiguration(dfsConfig); - } catch (final Exception ex) { + System.setProperty(OOZIE_TEST_NAME_NODE, jobConf.get("fs.default.name")); + ProxyUsers.refreshSuperUserGroupsConfiguration(conf); + } + catch (Exception ex) { shutdownMiniCluster(); throw ex; } @@ -950,32 +967,15 @@ public abstract class XTestCase extends TestCase { } } - private void createHdfsPathsAndSetupPermissions() throws IOException { - final FileSystem fileSystem = dfsCluster.getFileSystem(); - - fileSystem.mkdirs(new Path("target/test-data")); - fileSystem.mkdirs(new Path("target/test-data" + "/minicluster/mapred")); - fileSystem.mkdirs(new Path("/user")); - fileSystem.mkdirs(new Path("/tmp")); - fileSystem.mkdirs(new Path("/hadoop/mapred/system")); - - fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("target/test-data" + "/minicluster"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("target/test-data" + "/minicluster/mapred"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); - fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------")); - } - private void setUpEmbeddedHadoop2() throws Exception { if (dfsCluster != null && dfsCluster2 == null) { // Trick dfs location for MiniDFSCluster since it doesn't accept location as input) - final String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data"); + String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data"); try { System.setProperty("test.build.data", FilenameUtils.concat(testBuildDataSaved, "2")); // Only DFS cluster is created based upon current need dfsCluster2 = new MiniDFSCluster(createDFSConfig(), 2, true, null); - final FileSystem fileSystem = dfsCluster2.getFileSystem(); + FileSystem fileSystem = dfsCluster2.getFileSystem(); fileSystem.mkdirs(new Path("target/test-data")); fileSystem.mkdirs(new Path("/user")); fileSystem.mkdirs(new Path("/tmp")); @@ -983,10 +983,12 @@ public abstract class XTestCase extends TestCase { fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx")); fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx")); System.setProperty(OOZIE_TEST_NAME_NODE2, fileSystem.getConf().get("fs.default.name")); - } catch (final Exception ex) { + } + catch (Exception ex) { shutdownMiniCluster2(); throw ex; - } finally { + } + finally { // Restore previus value System.setProperty("test.build.data", testBuildDataSaved); } @@ -994,41 +996,31 @@ public abstract class XTestCase extends TestCase { } private JobConf createDFSConfig() throws UnknownHostException { - final JobConf conf = new JobConf(); - conf.set("dfs.block.access.token.enable", "false"); - conf.set("dfs.permissions", "true"); - conf.set("hadoop.security.authentication", "simple"); - - //Doing this because Hadoop 1.x does not support '*' if the value is '*,127.0.0.1' - final StringBuilder sb = new StringBuilder(); - sb.append("127.0.0.1,localhost"); - for (final InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { - sb.append(",").append(i.getCanonicalHostName()); - } - conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString()); - - conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup()); - conf.set("mapred.tasktracker.map.tasks.maximum", "4"); - conf.set("mapred.tasktracker.reduce.tasks.maximum", "4"); - - conf.set("hadoop.tmp.dir", "target/test-data" + "/minicluster"); - - // Scheduler properties required for YARN CapacityScheduler to work - conf.set("yarn.scheduler.capacity.root.queues", "default"); - conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); - // Required to prevent deadlocks with YARN CapacityScheduler - conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5"); - - return conf; - } - - private Configuration createYarnConfig(final Configuration parentConfig) { - final Configuration yarnConfig = new YarnConfiguration(parentConfig); - - yarnConfig.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); - yarnConfig.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); - - return yarnConfig; + JobConf conf = new JobConf(); + conf.set("dfs.block.access.token.enable", "false"); + conf.set("dfs.permissions", "true"); + conf.set("hadoop.security.authentication", "simple"); + + //Doing this because Hadoop 1.x does not support '*' if the value is '*,127.0.0.1' + StringBuilder sb = new StringBuilder(); + sb.append("127.0.0.1,localhost"); + for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) { + sb.append(",").append(i.getCanonicalHostName()); + } + conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString()); + + conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup()); + conf.set("mapred.tasktracker.map.tasks.maximum", "4"); + conf.set("mapred.tasktracker.reduce.tasks.maximum", "4"); + + conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster"); + + // Scheduler properties required for YARN CapacityScheduler to work + conf.set("yarn.scheduler.capacity.root.queues", "default"); + conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); + // Required to prevent deadlocks with YARN CapacityScheduler + conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5"); + return conf; } protected void setupHCatalogServer() throws Exception { @@ -1055,8 +1047,8 @@ public abstract class XTestCase extends TestCase { if (hs2Config == null) { // Make HS2 use our Mini cluster by copying all configs to HiveConf; also had to hack MiniHS2 hs2Config = new HiveConf(); - final Configuration jobConf = createJobConf(); - for (final Map.Entry<String, String> pair : jobConf) { + Configuration jobConf = createJobConf(); + for (Map.Entry<String, String> pair : jobConf) { hs2Config.set(pair.getKey(), pair.getValue()); } } @@ -1078,32 +1070,27 @@ public abstract class XTestCase extends TestCase { return hiveserver2.getJdbcURL(); } - protected String getHiveServer2JdbcURL(final String dbName) { + protected String getHiveServer2JdbcURL(String dbName) { return hiveserver2.getJdbcURL(dbName); } private static void shutdownMiniCluster() { try { - if (yarnCluster != null) { - final YarnJobActions yarnJobActions = - new YarnJobActions.Builder(yarnCluster.getConfig(), ApplicationsRequestScope.ALL) - .build(); - final Set<ApplicationId> allYarnJobs = yarnJobActions.getYarnJobs(); - - yarnJobActions.killSelectedYarnJobs(allYarnJobs); - - yarnCluster.stop(); + if (mrCluster != null) { + mrCluster.shutdown(); } - } catch (final Exception ex) { - System.out.println(ex.getMessage()); + } + catch (Exception ex) { + System.out.println(ex); } try { if (dfsCluster != null) { dfsCluster.shutdown(); } - } catch (final Exception ex) { - System.out.println(ex.getMessage()); + } + catch (Exception ex) { + System.out.println(ex); } // This is tied to the MiniCluster because it inherits configs from there hs2Config = null; @@ -1114,11 +1101,11 @@ public abstract class XTestCase extends TestCase { if (dfsCluster2 != null) { dfsCluster2.shutdown(); } - } catch (final Exception ex) { + } + catch (Exception ex) { System.out.println(ex); } } - private static final AtomicLong LAST_TESTCASE_FINISHED = new AtomicLong(); private static final AtomicInteger RUNNING_TESTCASES = new AtomicInteger(); @@ -1129,7 +1116,7 @@ public abstract class XTestCase extends TestCase { } public void run() { - final long shutdownWait = Long.parseLong(System.getProperty(TEST_MINICLUSTER_MONITOR_SHUTDOWN_WAIT, "10")) * 1000; + long shutdownWait = Long.parseLong(System.getProperty(TEST_MINICLUSTER_MONITOR_SHUTDOWN_WAIT, "10")) * 1000; LAST_TESTCASE_FINISHED.set(System.currentTimeMillis()); while (true) { if (RUNNING_TESTCASES.get() == 0) { @@ -1139,7 +1126,8 @@ public abstract class XTestCase extends TestCase { } try { Thread.sleep(1000); - } catch (final InterruptedException ex) { + } + catch (InterruptedException ex) { break; } } @@ -1149,10 +1137,10 @@ public abstract class XTestCase extends TestCase { } @SuppressWarnings("deprecation") - private JobConf createJobConfFromYarnCluster() { - final JobConf jobConf = new JobConf(); - final JobConf jobConfYarn = new JobConf(yarnCluster.getConfig()); - for (final Entry<String, String> entry : jobConfYarn) { + private JobConf createJobConfFromMRCluster() { + JobConf jobConf = new JobConf(); + JobConf jobConfMR = mrCluster.createJobConf(); + for ( Entry<String, String> entry : jobConfMR) { // MiniMRClientClusterFactory sets the job jar in Hadoop 2.0 causing tests to fail // TODO call conf.unset after moving completely to Hadoop 2.x if (!(entry.getKey().equals("mapreduce.job.jar") || entry.getKey().equals("mapred.jar"))) { @@ -1167,16 +1155,15 @@ public abstract class XTestCase extends TestCase { * @return a jobconf preconfigured to talk with the test cluster/minicluster. */ protected JobConf createJobConf() throws IOException { - final JobConf jobConf; - - if (yarnCluster != null) { - jobConf = createJobConfFromYarnCluster(); - } else { + JobConf jobConf; + if (mrCluster != null) { + jobConf = createJobConfFromMRCluster(); + } + else { jobConf = new JobConf(); jobConf.set("mapred.job.tracker", getJobTrackerUri()); jobConf.set("fs.default.name", getNameNodeUri()); } - return jobConf; } @@ -1199,22 +1186,29 @@ public abstract class XTestCase extends TestCase { * * @param executable The ShutdownJobTrackerExecutable to execute while the JobTracker is shutdown */ - protected void executeWhileJobTrackerIsShutdown(final ShutdownJobTrackerExecutable executable) { + protected void executeWhileJobTrackerIsShutdown(ShutdownJobTrackerExecutable executable) { + mrCluster.stopJobTracker(); + Exception ex = null; try { executable.execute(); - } catch (final Exception e) { - throw new RuntimeException(e); + } catch (Exception e) { + ex = e; + } finally { + mrCluster.startJobTracker(); + } + if (ex != null) { + throw new RuntimeException(ex); } } protected Services setupServicesForHCatalog() throws ServiceException { - final Services services = new Services(); + Services services = new Services(); setupServicesForHCataLogImpl(services); return services; } - private void setupServicesForHCataLogImpl(final Services services) { - final Configuration conf = services.getConf(); + private void setupServicesForHCataLogImpl(Services services) { + Configuration conf = services.getConf(); conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + "," + PartitionDependencyManagerService.class.getName() + "," + @@ -1222,31 +1216,31 @@ public abstract class XTestCase extends TestCase { conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#" + localActiveMQBroker + - "connectionFactoryNames#" + "ConnectionFactory"); + "connectionFactoryNames#"+ "ConnectionFactory"); conf.set(URIHandlerService.URI_HANDLERS, FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName()); setSystemProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); setSystemProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); } - protected Services setupServicesForHCatalog(final Services services) throws ServiceException { + protected Services setupServicesForHCatalog(Services services) throws ServiceException { setupServicesForHCataLogImpl(services); return services; } - protected YarnApplicationState waitUntilYarnAppState(final String externalId, final EnumSet<YarnApplicationState> acceptedStates) + protected YarnApplicationState waitUntilYarnAppState(String externalId, final EnumSet<YarnApplicationState> acceptedStates) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>(); - final JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf); try { waitFor(60 * 1000, new Predicate() { @Override public boolean evaluate() throws Exception { - final YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState(); + YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState(); finalState.setValue(state); return acceptedStates.contains(state); @@ -1262,20 +1256,20 @@ public abstract class XTestCase extends TestCase { return finalState.getValue(); } - protected void waitUntilYarnAppDoneAndAssertSuccess(final String externalId) throws HadoopAccessorException, IOException, YarnException { - final YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); + protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { + YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); assertEquals("YARN App state", YarnApplicationState.FINISHED, state); } - protected void waitUntilYarnAppKilledAndAssertSuccess(final String externalId) throws HadoopAccessorException, IOException, YarnException { - final YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); + protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { + YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); assertEquals("YARN App state", YarnApplicationState.KILLED, state); } protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); YarnApplicationState state = null; - final JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); // This is needed here because we need a mutable final YarnClient final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null); try {
