This is an automated email from the ASF dual-hosted git repository. sseth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
commit 4bbd3c2e7600752cfe0d074d61e00e12ea0ee748 Author: Siddharth Seth <[email protected]> AuthorDate: Mon May 6 10:51:43 2019 -0700 Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than" This reverts commit 46b4004d97dd2f2cde491a93abcdd48c9b82f68e. --- .../org/apache/tez/common/TezUtilsInternal.java | 41 +++++++++- .../java/org/apache/tez/client/LocalClient.java | 53 ++++++------ .../org/apache/tez/examples/TezExampleBase.java | 3 +- .../java/org/apache/tez/test/TestLocalMode.java | 93 +++++++--------------- 4 files changed, 97 insertions(+), 93 deletions(-) diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index adcae8a..5d7aea3 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -71,10 +71,20 @@ public class TezUtilsInternal { public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws IOException { - File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME); - try (FileInputStream fis = new FileInputStream(confPBFile)) { - return ConfigurationProto.parseFrom(fis); + FileInputStream confPBBinaryStream = null; + ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); + try { + confPBBinaryStream = + new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME)); + confProtoBuilder.mergeFrom(confPBBinaryStream); + } finally { + if (confPBBinaryStream != null) { + confPBBinaryStream.close(); + } } + + ConfigurationProto confProto = confProtoBuilder.build(); + return confProto; } public static void addUserSpecifiedTezConfiguration(Configuration conf, @@ -85,6 +95,31 @@ public class TezUtilsInternal { } } } +// +// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws +// IOException { +// FileInputStream confPBBinaryStream = null; +// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); +// try { +// confPBBinaryStream = +// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME)); +// confProtoBuilder.mergeFrom(confPBBinaryStream); +// } finally { +// if (confPBBinaryStream != null) { +// confPBBinaryStream.close(); +// } +// } +// +// ConfigurationProto confProto = confProtoBuilder.build(); +// +// List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList(); +// if (kvPairList != null && !kvPairList.isEmpty()) { +// for (PlanKeyValuePair kvPair : kvPairList) { +// conf.set(kvPair.getKey(), kvPair.getValue()); +// } +// } +// } + public static byte[] compressBytes(byte[] inBytes) throws IOException { StopWatch sw = new StopWatch().start(); diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 140ada1..6baea48 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -24,8 +24,6 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; -import javax.annotation.Nullable; - import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -85,6 +83,7 @@ public class LocalClient extends FrameworkClient { @Override public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) { this.conf = tezConf; + tezConf.set("fs.defaultFS", "file:///"); // Tez libs already in the client's classpath this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true); this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName); @@ -287,34 +286,19 @@ public class LocalClient extends FrameworkClient { try { ApplicationId appId = appContext.getApplicationId(); - // Set up working directory for DAGAppMaster. - // The staging directory may be on the default file system, which may or may not - // be the local FS. For example, when using testing Hive against a pseudo-distributed - // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch - // directories on HDFS, and sets the Tez staging directory to be the session's - // scratch directory. - // - // To handle this case, we need to copy over the staging data back onto the - // local file system, where the rest of the Tez Child code expects it. - // - // NOTE: we base the local working directory path off of the staging path, even - // though it might be on a different file system. Typically they're both in a - // path starting with /tmp, but in the future we may want to use a different - // temp directory locally. + // Set up working directory for DAGAppMaster Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()); - FileSystem stagingFs = staging.getFileSystem(conf); - - FileSystem localFs = FileSystem.getLocal(conf); - Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd")); + Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd"); LOG.info("Using working directory: " + userDir.toUri().getPath()); + FileSystem fs = FileSystem.get(conf); // copy data from staging directory to working directory to simulate the resource localizing - FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf); + FileUtil.copy(fs, staging, fs, userDir, false, conf); // Prepare Environment Path logDir = new Path(userDir, "localmode-log-dir"); Path localDir = new Path(userDir, "localmode-local-dir"); - localFs.mkdirs(logDir); - localFs.mkdirs(localDir); + fs.mkdirs(logDir); + fs.mkdirs(localDir); UserGroupInformation.setConfiguration(conf); // Add session specific credentials to the AM credentials. @@ -373,11 +357,30 @@ public class LocalClient extends FrameworkClient { // Read in additional information about external services AMPluginDescriptorProto amPluginDescriptorProto = - TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir) - .getAmPluginDescriptor(); + getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString()); + return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs, versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto); } + + private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf, + String applicationIdString) throws + IOException { + Path tezSysStagingPath = TezCommonUtils + .getTezSystemStagingPath(conf, applicationIdString); + // Remove the filesystem qualifier. + String unqualifiedPath = tezSysStagingPath.toUri().getPath(); + + DAGProtos.ConfigurationProto confProto = + TezUtilsInternal + .readUserSpecifiedTezConfiguration(unqualifiedPath); + AMPluginDescriptorProto amPluginDescriptorProto = null; + if (confProto.hasAmPluginDescriptor()) { + amPluginDescriptorProto = confProto.getAmPluginDescriptor(); + } + return amPluginDescriptorProto; + } + } diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index cb52105..6b626b1 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -276,7 +276,8 @@ public abstract class TezExampleBase extends Configured implements Tool { protected void printExtraOptionsUsage(PrintStream ps) { ps.println("Tez example extra options supported are"); - ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, " + // TODO TEZ-1348 make it able to access dfs in tez local mode + ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode," + " run it in distributed mode without this option"); ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput," + " enable split grouping without this option."); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java index 318349c..2a5b65f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java @@ -20,16 +20,12 @@ package org.apache.tez.test; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -47,73 +43,23 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.processor.SleepProcessor; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import static org.junit.Assert.*; -@RunWith(Parameterized.class) public class TestLocalMode { private static final File TEST_DIR = new File( System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode"); - private static MiniDFSCluster dfsCluster; - private static FileSystem remoteFs; - - @Parameterized.Parameter - public boolean useDfs; - - @Parameterized.Parameters(name = "useDFS:{0}") - public static Collection<Object[]> params() { - return Arrays.asList(new Object[][]{{ false }, { true }}); - } - - - @BeforeClass - public static void beforeClass() throws Exception { - try { - Configuration conf = new Configuration(); - dfsCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true) - .racks(null).build(); - remoteFs = dfsCluster.getFileSystem(); - } catch (IOException io) { - throw new RuntimeException("problem starting mini dfs cluster", io); - } - } - - @AfterClass - public static void afterClass() throws InterruptedException { - if (dfsCluster != null) { - try { - dfsCluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private TezConfiguration createConf() { - TezConfiguration conf = new TezConfiguration(); - conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - if (useDfs) { - conf.set("fs.defaultFS", remoteFs.getUri().toString()); - } else { - conf.set("fs.defaultFS", "file:///"); - } - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); - return conf; - } - @Test(timeout = 30000) public void testMultipleClientsWithSession() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = createConf(); + TezConfiguration tezConf1 = new TezConfiguration(); + tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf1.set("fs.defaultFS", "file:///"); + tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); TezClient tezClient1 = TezClient.create("commonName", tezConf1, true); tezClient1.start(); @@ -126,7 +72,11 @@ public class TestLocalMode { dagClient1.close(); tezClient1.stop(); - TezConfiguration tezConf2 = createConf(); + + TezConfiguration tezConf2 = new TezConfiguration(); + tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf2.set("fs.defaultFS", "file:///"); + tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); TezClient tezClient2 = TezClient.create("commonName", tezConf2, true); tezClient2.start(); @@ -141,7 +91,10 @@ public class TestLocalMode { @Test(timeout = 10000) public void testMultipleClientsWithoutSession() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = createConf(); + TezConfiguration tezConf1 = new TezConfiguration(); + tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf1.set("fs.defaultFS", "file:///"); + tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); @@ -155,7 +108,10 @@ public class TestLocalMode { tezClient1.stop(); - TezConfiguration tezConf2 = createConf(); + TezConfiguration tezConf2 = new TezConfiguration(); + tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf2.set("fs.defaultFS", "file:///"); + tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); TezClient tezClient2 = TezClient.create("commonName", tezConf2, false); tezClient2.start(); @@ -170,7 +126,10 @@ public class TestLocalMode { @Test(timeout = 20000) public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = createConf(); + TezConfiguration tezConf1 = new TezConfiguration(); + tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf1.set("fs.defaultFS", "file:///"); + tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); // Run in non-session mode so that the AM terminates TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); @@ -191,7 +150,10 @@ public class TestLocalMode { @Test(timeout = 20000) public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException, IOException { - TezConfiguration tezConf1 = createConf(); + TezConfiguration tezConf1 = new TezConfiguration(); + tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf1.set("fs.defaultFS", "file:///"); + tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); // Run in non-session mode so that the AM terminates TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); @@ -249,7 +211,10 @@ public class TestLocalMode { String[] outputPaths = new String[dags]; DAGClient[] dagClients = new DAGClient[dags]; - TezConfiguration tezConf = createConf(); + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf.set("fs.defaultFS", "file:///"); + tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true); tezClient.start();
