This is an automated email from the ASF dual-hosted git repository. sseth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
commit 82c1d75c7b3abf7316ba265a599b910fcc2360c7 Author: Siddharth Seth <[email protected]> AuthorDate: Mon May 6 10:52:19 2019 -0700 EZ-1348. Allow Tez local mode to run against filesystems other than local FS. (Todd Lipcon via sseth) --- .../org/apache/tez/common/TezUtilsInternal.java | 41 +-------- .../java/org/apache/tez/client/LocalClient.java | 53 ++++++------ .../org/apache/tez/examples/TezExampleBase.java | 3 +- .../java/org/apache/tez/test/TestLocalMode.java | 98 +++++++++++++++------- 4 files changed, 97 insertions(+), 98 deletions(-) diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 5d7aea3..adcae8a 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -71,20 +71,10 @@ public class TezUtilsInternal { public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws IOException { - FileInputStream confPBBinaryStream = null; - ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); - try { - confPBBinaryStream = - new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME)); - confProtoBuilder.mergeFrom(confPBBinaryStream); - } finally { - if (confPBBinaryStream != null) { - confPBBinaryStream.close(); - } + File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME); + try (FileInputStream fis = new FileInputStream(confPBFile)) { + return ConfigurationProto.parseFrom(fis); } - - ConfigurationProto confProto = confProtoBuilder.build(); - return confProto; } public static void addUserSpecifiedTezConfiguration(Configuration conf, @@ -95,31 +85,6 @@ public class TezUtilsInternal { } } } -// -// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws -// IOException { -// FileInputStream confPBBinaryStream = null; -// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); -// try { -// confPBBinaryStream = -// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME)); -// confProtoBuilder.mergeFrom(confPBBinaryStream); -// } finally { -// if (confPBBinaryStream != null) { -// confPBBinaryStream.close(); -// } -// } -// -// ConfigurationProto confProto = confProtoBuilder.build(); -// -// List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList(); -// if (kvPairList != null && !kvPairList.isEmpty()) { -// for (PlanKeyValuePair kvPair : kvPairList) { -// conf.set(kvPair.getKey(), kvPair.getValue()); -// } -// } -// } - public static byte[] compressBytes(byte[] inBytes) throws IOException { StopWatch sw = new StopWatch().start(); diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 6baea48..9006971 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; + import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -50,7 +51,6 @@ import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClientHandler; -import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.DAGAppMaster; @@ -83,7 +83,6 @@ public class LocalClient extends FrameworkClient { @Override public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) { this.conf = tezConf; - tezConf.set("fs.defaultFS", "file:///"); // Tez libs already in the client's classpath this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true); this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName); @@ -286,19 +285,34 @@ public class LocalClient extends FrameworkClient { try { ApplicationId appId = appContext.getApplicationId(); - // Set up working directory for DAGAppMaster + // Set up working directory for DAGAppMaster. + // The staging directory may be on the default file system, which may or may not + // be the local FS. For example, when using testing Hive against a pseudo-distributed + // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch + // directories on HDFS, and sets the Tez staging directory to be the session's + // scratch directory. + // + // To handle this case, we need to copy over the staging data back onto the + // local file system, where the rest of the Tez Child code expects it. + // + // NOTE: we base the local working directory path off of the staging path, even + // though it might be on a different file system. Typically they're both in a + // path starting with /tmp, but in the future we may want to use a different + // temp directory locally. Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()); - Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd"); + FileSystem stagingFs = staging.getFileSystem(conf); + + FileSystem localFs = FileSystem.getLocal(conf); + Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd")); LOG.info("Using working directory: " + userDir.toUri().getPath()); - FileSystem fs = FileSystem.get(conf); // copy data from staging directory to working directory to simulate the resource localizing - FileUtil.copy(fs, staging, fs, userDir, false, conf); + FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf); // Prepare Environment Path logDir = new Path(userDir, "localmode-log-dir"); Path localDir = new Path(userDir, "localmode-local-dir"); - fs.mkdirs(logDir); - fs.mkdirs(localDir); + localFs.mkdirs(logDir); + localFs.mkdirs(localDir); UserGroupInformation.setConfiguration(conf); // Add session specific credentials to the AM credentials. @@ -357,30 +371,11 @@ public class LocalClient extends FrameworkClient { // Read in additional information about external services AMPluginDescriptorProto amPluginDescriptorProto = - getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString()); - + TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir) + .getAmPluginDescriptor(); return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs, versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto); } - - private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf, - String applicationIdString) throws - IOException { - Path tezSysStagingPath = TezCommonUtils - .getTezSystemStagingPath(conf, applicationIdString); - // Remove the filesystem qualifier. - String unqualifiedPath = tezSysStagingPath.toUri().getPath(); - - DAGProtos.ConfigurationProto confProto = - TezUtilsInternal - .readUserSpecifiedTezConfiguration(unqualifiedPath); - AMPluginDescriptorProto amPluginDescriptorProto = null; - if (confProto.hasAmPluginDescriptor()) { - amPluginDescriptorProto = confProto.getAmPluginDescriptor(); - } - return amPluginDescriptorProto; - } - } diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index 6b626b1..cb52105 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -276,8 +276,7 @@ public abstract class TezExampleBase extends Configured implements Tool { protected void printExtraOptionsUsage(PrintStream ps) { ps.println("Tez example extra options supported are"); - // TODO TEZ-1348 make it able to access dfs in tez local mode - ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode," + ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, " + " run it in distributed mode without this option"); ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput," + " enable split grouping without this option."); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java index 2a5b65f..ffc67fe 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java @@ -20,12 +20,16 @@ package org.apache.tez.test; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -43,23 +47,78 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.processor.SleepProcessor; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.junit.Assert.*; +/** + * Tests for running Tez in local execution mode (without YARN). + */ +@RunWith(Parameterized.class) public class TestLocalMode { private static final File TEST_DIR = new File( System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode"); + private static MiniDFSCluster dfsCluster; + private static FileSystem remoteFs; + + private final boolean useDfs; + + @Parameterized.Parameters(name = "useDFS:{0}") + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][]{{false}, {true}}); + } + + public TestLocalMode(boolean useDfs) { + this.useDfs = useDfs; + } + + @BeforeClass + public static void beforeClass() throws Exception { + try { + Configuration conf = new Configuration(); + dfsCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true) + .racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + } + + @AfterClass + public static void afterClass() throws InterruptedException { + if (dfsCluster != null) { + try { + dfsCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private TezConfiguration createConf() { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + if (useDfs) { + conf.set("fs.defaultFS", remoteFs.getUri().toString()); + } else { + conf.set("fs.defaultFS", "file:///"); + } + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + return conf; + } + @Test(timeout = 30000) public void testMultipleClientsWithSession() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = new TezConfiguration(); - tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf1.set("fs.defaultFS", "file:///"); - tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf1 = createConf(); TezClient tezClient1 = TezClient.create("commonName", tezConf1, true); tezClient1.start(); @@ -72,11 +131,7 @@ public class TestLocalMode { dagClient1.close(); tezClient1.stop(); - - TezConfiguration tezConf2 = new TezConfiguration(); - tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf2.set("fs.defaultFS", "file:///"); - tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf2 = createConf(); DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); TezClient tezClient2 = TezClient.create("commonName", tezConf2, true); tezClient2.start(); @@ -91,10 +146,7 @@ public class TestLocalMode { @Test(timeout = 10000) public void testMultipleClientsWithoutSession() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = new TezConfiguration(); - tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf1.set("fs.defaultFS", "file:///"); - tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf1 = createConf(); TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); @@ -108,10 +160,7 @@ public class TestLocalMode { tezClient1.stop(); - TezConfiguration tezConf2 = new TezConfiguration(); - tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf2.set("fs.defaultFS", "file:///"); - tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf2 = createConf(); DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); TezClient tezClient2 = TezClient.create("commonName", tezConf2, false); tezClient2.start(); @@ -126,10 +175,7 @@ public class TestLocalMode { @Test(timeout = 20000) public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = new TezConfiguration(); - tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf1.set("fs.defaultFS", "file:///"); - tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf1 = createConf(); // Run in non-session mode so that the AM terminates TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); @@ -150,10 +196,7 @@ public class TestLocalMode { @Test(timeout = 20000) public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = new TezConfiguration(); - tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf1.set("fs.defaultFS", "file:///"); - tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf1 = createConf(); // Run in non-session mode so that the AM terminates TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); @@ -211,10 +254,7 @@ public class TestLocalMode { String[] outputPaths = new String[dags]; DAGClient[] dagClients = new DAGClient[dags]; - TezConfiguration tezConf = new TezConfiguration(); - tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf.set("fs.defaultFS", "file:///"); - tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezConfiguration tezConf = createConf(); TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true); tezClient.start();
