Author: gunther
Date: Sun Dec 15 23:07:32 2013
New Revision: 1551076
URL: http://svn.apache.org/r1551076
Log:
HIVE-5065: Create proper (i.e.: non .q file based) junit tests for DagUtils and
TezTask (Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Sun
Dec 15 23:07:32 2013
@@ -513,10 +513,16 @@ public abstract class Task<T extends Ser
Throwable getException() {
return exception;
}
+
void setException(Throwable ex) {
exception = ex;
}
+ public void setConsole(LogHelper console) {
+ this.console = console;
+ }
+
+ @Override
public String toString() {
return getId() + ":" + getType();
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Sun Dec 15 23:07:32 2013
@@ -97,12 +97,13 @@ import org.apache.tez.runtime.library.ou
public class DagUtils {
private static final String TEZ_DIR = "_tez_scratch_dir";
+ private static DagUtils instance;
/*
* Creates the configuration object necessary to run a specific vertex from
* map work. This includes input formats, input processor, etc.
*/
- private static JobConf initializeVertexConf(JobConf baseConf, MapWork
mapWork) {
+ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
JobConf conf = new JobConf(baseConf);
if (mapWork.getNumMapTasks() != null) {
@@ -157,7 +158,7 @@ public class DagUtils {
* @param w The second vertex (sink)
* @return
*/
- public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex
w,
+ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
EdgeType edgeType)
throws IOException {
@@ -199,7 +200,7 @@ public class DagUtils {
/*
* Helper function to create Vertex from MapWork.
*/
- private static Vertex createVertex(JobConf conf, MapWork mapWork,
+ private Vertex createVertex(JobConf conf, MapWork mapWork,
LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
Path mrScratchDir, Context ctx) throws Exception {
@@ -295,7 +296,7 @@ public class DagUtils {
/*
* Helper function to create JobConf for specific ReduceWork.
*/
- private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork
reduceWork) {
+ private JobConf initializeVertexConf(JobConf baseConf, ReduceWork
reduceWork) {
JobConf conf = new JobConf(baseConf);
conf.set("mapred.reducer.class", ExecReducer.class.getName());
@@ -311,7 +312,7 @@ public class DagUtils {
/*
* Helper function to create Vertex for given ReduceWork.
*/
- private static Vertex createVertex(JobConf conf, ReduceWork reduceWork,
+ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
Path mrScratchDir, Context ctx) throws Exception {
@@ -351,7 +352,7 @@ public class DagUtils {
/*
* Helper method to create a yarn local resource.
*/
- private static LocalResource createLocalResource(FileSystem remoteFs, Path
file,
+ private LocalResource createLocalResource(FileSystem remoteFs, Path file,
LocalResourceType type, LocalResourceVisibility visibility) {
FileStatus fstat = null;
@@ -381,7 +382,7 @@ public class DagUtils {
* @throws LoginException if we are unable to figure user information
* @throws IOException when any dfs operation fails.
*/
- public static Path getDefaultDestDir(Configuration conf) throws
LoginException, IOException {
+ public Path getDefaultDestDir(Configuration conf) throws LoginException,
IOException {
UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
String userPathStr = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
@@ -415,7 +416,7 @@ public class DagUtils {
* @throws IOException when hdfs operation fails
* @throws LoginException when getDefaultDestDir fails with the same
exception
*/
- public static List<LocalResource> localizeTempFiles(Configuration conf)
throws IOException, LoginException {
+ public List<LocalResource> localizeTempFiles(Configuration conf) throws
IOException, LoginException {
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
String addedFiles = Utilities.getResourceFiles(conf,
SessionState.ResourceType.FILE);
@@ -471,7 +472,7 @@ public class DagUtils {
}
// the api that finds the jar being used by this class on disk
- public static String getExecJarPathLocal () throws URISyntaxException {
+ public String getExecJarPathLocal () throws URISyntaxException {
// returns the location on disc of the jar of this class.
return
DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
}
@@ -479,7 +480,7 @@ public class DagUtils {
/*
* Helper function to retrieve the basename of a local resource
*/
- public static String getBaseName(LocalResource lr) {
+ public String getBaseName(LocalResource lr) {
return FilenameUtils.getName(lr.getResource().getFile());
}
@@ -487,7 +488,7 @@ public class DagUtils {
* @param pathStr - the string from which we try to determine the resource
base name
* @return the name of the resource from a given path string.
*/
- public static String getResourceBaseName(String pathStr) {
+ public String getResourceBaseName(String pathStr) {
String[] splits = pathStr.split("/");
return splits[splits.length - 1];
}
@@ -499,7 +500,7 @@ public class DagUtils {
* @return true if the file names match else returns false.
* @throws IOException when any file system related call fails
*/
- private static boolean checkPreExisting(Path src, Path dest, Configuration
conf)
+ private boolean checkPreExisting(Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
@@ -528,7 +529,7 @@ public class DagUtils {
* @return localresource from tez localization.
* @throws IOException when any file system related calls fails.
*/
- public static LocalResource localizeResource(Path src, Path dest,
Configuration conf)
+ public LocalResource localizeResource(Path src, Path dest, Configuration
conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
if (!(destFS instanceof DistributedFileSystem)) {
@@ -557,7 +558,7 @@ public class DagUtils {
* @return JobConf base configuration for job execution
* @throws IOException
*/
- public static JobConf createConfiguration(HiveConf hiveConf) throws
IOException {
+ public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
hiveConf.setBoolean("mapred.mapper.new-api", false);
JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
@@ -585,7 +586,7 @@ public class DagUtils {
* @param work BaseWork will be used to populate the configuration object.
* @return JobConf new configuration object
*/
- public static JobConf initializeVertexConf(JobConf conf, BaseWork work) {
+ public JobConf initializeVertexConf(JobConf conf, BaseWork work) {
// simply dispatch the call to the right method for the actual (sub-) type
of
// BaseWork.
@@ -612,7 +613,7 @@ public class DagUtils {
* @param ctx This query's context
* @return Vertex
*/
- public static Vertex createVertex(JobConf conf, BaseWork work,
+ public Vertex createVertex(JobConf conf, BaseWork work,
Path scratchDir, LocalResource appJarLr, List<LocalResource>
additionalLr,
FileSystem fileSystem, Context ctx, boolean hasChildren) throws
Exception {
@@ -626,8 +627,8 @@ public class DagUtils {
v = createVertex(conf, (ReduceWork) work, appJarLr,
additionalLr, fileSystem, scratchDir, ctx);
} else {
- assert false;
- return null;
+ // something is seriously wrong if this is happening
+ throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
}
// initialize stats publisher if necessary
@@ -660,7 +661,7 @@ public class DagUtils {
* createTezDir creates a temporary directory in the scratchDir folder to
* be used with Tez. Assumes scratchDir exists.
*/
- public static Path createTezDir(Path scratchDir, Configuration conf)
+ public Path createTezDir(Path scratchDir, Configuration conf)
throws IOException {
Path tezDir = getTezDir(scratchDir);
FileSystem fs = tezDir.getFileSystem(conf);
@@ -671,10 +672,21 @@ public class DagUtils {
/**
* Gets the tez dir that belongs to the hive scratch dir
*/
- public static Path getTezDir(Path scratchDir) {
+ public Path getTezDir(Path scratchDir) {
return new Path(scratchDir, TEZ_DIR);
}
+ /**
+ * Singleton
+ * @return instance of this class
+ */
+ public static DagUtils getInstance() {
+ if (instance == null) {
+ instance = new DagUtils();
+ }
+ return instance;
+ }
+
private DagUtils() {
// don't instantiate
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Sun Dec 15 23:07:32 2013
@@ -58,6 +58,7 @@ public class TezSessionState {
private LocalResource appJarLr;
private TezSession session;
private String sessionId;
+ private DagUtils utils;
private static List<TezSessionState> openSessions
= Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -66,7 +67,16 @@ public class TezSessionState {
* Constructor. We do not automatically connect, because we only want to
* load tez classes when the user has tez installed.
*/
- public void TezSessionContext() {
+ public TezSessionState(DagUtils utils) {
+ this.utils = utils;
+ }
+
+ /**
+ * Constructor. We do not automatically connect, because we only want to
+ * load tez classes when the user has tez installed.
+ */
+ public TezSessionState() {
+ this(DagUtils.getInstance());
}
/**
@@ -112,7 +122,7 @@ public class TezSessionState {
// configuration for the application master
Map<String, LocalResource> commonLocalResources = new HashMap<String,
LocalResource>();
- commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
+ commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
AMConfiguration amConfig = new AMConfiguration(null, commonLocalResources,
tezConfig, null);
@@ -211,8 +221,8 @@ public class TezSessionState {
private LocalResource createHiveExecLocalResource()
throws IOException, LoginException, URISyntaxException {
String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
- String currentVersionPathStr = DagUtils.getExecJarPathLocal();
- String currentJarName =
DagUtils.getResourceBaseName(currentVersionPathStr);
+ String currentVersionPathStr = utils.getExecJarPathLocal();
+ String currentJarName = utils.getResourceBaseName(currentVersionPathStr);
FileSystem fs = null;
Path jarPath = null;
FileStatus dirStatus = null;
@@ -234,18 +244,18 @@ public class TezSessionState {
if ((dirStatus != null) && (dirStatus.isDir())) {
FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
for (FileStatus fstatus : listFileStatus) {
- String jarName =
DagUtils.getResourceBaseName(fstatus.getPath().toString());
+ String jarName =
utils.getResourceBaseName(fstatus.getPath().toString());
if (jarName.equals(currentJarName)) {
// we have found the jar we need.
jarPath = fstatus.getPath();
- return DagUtils.localizeResource(null, jarPath, conf);
+ return utils.localizeResource(null, jarPath, conf);
}
}
// jar wasn't in the directory, copy the one in current use
if (jarPath == null) {
Path dest = new Path(hiveJarDir + "/" + currentJarName);
- return DagUtils.localizeResource(new Path(currentVersionPathStr),
dest, conf);
+ return utils.localizeResource(new Path(currentVersionPathStr), dest,
conf);
}
}
}
@@ -258,12 +268,12 @@ public class TezSessionState {
*/
if ((hiveJarDir == null) || (dirStatus == null) ||
((dirStatus != null) && (!dirStatus.isDir()))) {
- Path dest = DagUtils.getDefaultDestDir(conf);
+ Path dest = utils.getDefaultDestDir(conf);
String destPathStr = dest.toString();
String jarPathStr = destPathStr + "/" + currentJarName;
dirStatus = fs.getFileStatus(dest);
if (dirStatus.isDir()) {
- return DagUtils.localizeResource(new Path(currentVersionPathStr), new
Path(jarPathStr), conf);
+ return utils.localizeResource(new Path(currentVersionPathStr), new
Path(jarPathStr), conf);
} else {
throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Sun Dec 15 23:07:32 2013
@@ -72,8 +72,15 @@ public class TezTask extends Task<TezWor
private TezCounters counters;
+ private DagUtils utils;
+
public TezTask() {
+ this(DagUtils.getInstance());
+ }
+
+ public TezTask(DagUtils utils) {
super();
+ this.utils = utils;
}
public TezCounters getTezCounters() {
@@ -116,10 +123,10 @@ public class TezTask extends Task<TezWor
Path scratchDir = new Path(ctx.getMRScratchDir());
// create the tez tmp dir
- DagUtils.createTezDir(scratchDir, conf);
+ utils.createTezDir(scratchDir, conf);
// jobConf will hold all the configuration for hadoop, tez, and hive
- JobConf jobConf = DagUtils.createConfiguration(conf);
+ JobConf jobConf = utils.createConfiguration(conf);
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
@@ -129,7 +136,7 @@ public class TezTask extends Task<TezWor
DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
// submit will send the job to the cluster and start executing
- client = submit(jobConf, dag, scratchDir, appJarLr,
session.getSession());
+ client = submit(jobConf, dag, scratchDir, appJarLr, session);
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor();
@@ -161,7 +168,7 @@ public class TezTask extends Task<TezWor
return rc;
}
- private DAG build(JobConf conf, TezWork work, Path scratchDir,
+ DAG build(JobConf conf, TezWork work, Path scratchDir,
LocalResource appJarLr, Context ctx)
throws Exception {
@@ -170,14 +177,14 @@ public class TezTask extends Task<TezWor
Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
// we need to get the user specified local resources for this dag
- List<LocalResource> additionalLr = DagUtils.localizeTempFiles(conf);
+ List<LocalResource> additionalLr = utils.localizeTempFiles(conf);
// getAllWork returns a topologically sorted list, which we use to make
// sure that vertices are created before they are used in edges.
List<BaseWork> ws = work.getAllWork();
Collections.reverse(ws);
- Path tezDir = DagUtils.getTezDir(scratchDir);
+ Path tezDir = utils.getTezDir(scratchDir);
FileSystem fs = tezDir.getFileSystem(conf);
// the name of the dag is what is displayed in the AM/Job UI
@@ -191,8 +198,8 @@ public class TezTask extends Task<TezWor
// translate work to vertex
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX +
w.getName());
- JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
- Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
+ JobConf wxConf = utils.initializeVertexConf(conf, w);
+ Vertex wx = utils.createVertex(wxConf, w, tezDir,
appJarLr, additionalLr, fs, ctx, !isFinal);
dag.addVertex(wx);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX +
w.getName());
@@ -206,7 +213,7 @@ public class TezTask extends Task<TezWor
EdgeType edgeType = work.getEdgeProperty(w, v);
- e = DagUtils.createEdge(wxConf, wx, workToConf.get(v),
workToVertex.get(v), edgeType);
+ e = utils.createEdge(wxConf, wx, workToConf.get(v),
workToVertex.get(v), edgeType);
dag.addEdge(e);
}
}
@@ -214,8 +221,8 @@ public class TezTask extends Task<TezWor
return dag;
}
- private DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
- LocalResource appJarLr, TezSession session)
+ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
+ LocalResource appJarLr, TezSessionState sessionState)
throws IOException, TezException, InterruptedException,
LoginException, URISyntaxException, HiveException {
@@ -224,25 +231,19 @@ public class TezTask extends Task<TezWor
try {
// ready to start execution on the cluster
- dagClient = session.submitDAG(dag);
+ dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
- // Need to remove this static hack. But this is the way currently to
- // get a session.
- SessionState ss = SessionState.get();
- TezSessionState tezSession = ss.getTezSession();
-
// close the old one, but keep the tmp files around
- tezSession.close(true);
+ sessionState.close(true);
// (re)open the session
- tezSession.open(ss.getSessionId(), this.conf);
- session = tezSession.getSession();
+ sessionState.open(sessionState.getSessionId(), this.conf);
console.printInfo("Session re-established.");
- dagClient = session.submitDAG(dag);
+ dagClient = sessionState.getSession().submitDAG(dag);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
@@ -253,7 +254,7 @@ public class TezTask extends Task<TezWor
* close will move the temp files into the right place for the fetch
* task. If the job has failed it will clean up the files.
*/
- private int close(TezWork work, int rc) {
+ int close(TezWork work, int rc) {
try {
List<BaseWork> ws = work.getAllWork();
for (BaseWork w: ws) {
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
Sun Dec 15 23:07:32 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
@@ -58,6 +59,9 @@ public class GenTezWork implements NodeP
GenTezProcContext context = (GenTezProcContext) procContext;
+ assert context != null && context.currentTask != null
+ && context.currentRootOperator != null;
+
// Operator is a file sink or reduce sink. Something that forces
// a new vertex.
Operator<?> operator = (Operator<?>) nd;
@@ -183,7 +187,7 @@ public class GenTezWork implements NodeP
return null;
}
- private ReduceWork createReduceWork(GenTezProcContext context, Operator<?>
root,
+ protected ReduceWork createReduceWork(GenTezProcContext context, Operator<?>
root,
TezWork tezWork) {
assert !root.getParentOperators().isEmpty();
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
@@ -210,7 +214,7 @@ public class GenTezWork implements NodeP
return reduceWork;
}
- private void setupReduceSink(GenTezProcContext context, ReduceWork
reduceWork,
+ protected void setupReduceSink(GenTezProcContext context, ReduceWork
reduceWork,
ReduceSinkOperator reduceSink) {
LOG.debug("Setting up reduce sink: " + reduceSink
@@ -227,7 +231,7 @@ public class GenTezWork implements NodeP
reduceSink.getConf().setOutputName(reduceWork.getName());
}
- private MapWork createMapWork(GenTezProcContext context, Operator<?> root,
+ protected MapWork createMapWork(GenTezProcContext context, Operator<?> root,
TezWork tezWork) throws SemanticException {
assert root.getParentOperators().isEmpty();
MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
@@ -237,11 +241,19 @@ public class GenTezWork implements NodeP
assert root instanceof TableScanOperator;
String alias = ((TableScanOperator)root).getConf().getAlias();
- GenMapRedUtils.setMapWork(mapWork, context.parseContext,
- context.inputs, null, root, alias, context.conf, false);
+ setupMapWork(mapWork, context, root, alias);
+
+ // add new item to the tez work
tezWork.add(mapWork);
return mapWork;
}
+ // this method's main use is to help unit testing this class
+ protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
+ Operator<? extends OperatorDesc> root, String alias) throws
SemanticException {
+ // All the setup is done in GenMapRedUtils
+ GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+ context.inputs, null, root, alias, context.conf, false);
+ }
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Sun Dec 15 23:07:32 2013
@@ -91,6 +91,7 @@ public class TezCompiler extends TaskCom
opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
ReduceSinkOperator.getOperatorName() + "%"),
new SetReducerParallelism());
+
opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
Sun Dec 15 23:07:32 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -90,6 +91,24 @@ public class TezWork extends AbstractOpe
}
/**
+ * add all nodes in the collection without any connections
+ */
+ public void addAll(Collection<BaseWork> c) {
+ for (BaseWork w: c) {
+ this.add(w);
+ }
+ }
+
+ /**
+ * add all nodes in the collection without any connections
+ */
+ public void addAll(BaseWork[] bws) {
+ for (BaseWork w: bws) {
+ this.add(w);
+ }
+ }
+
+ /**
* add creates a new node in the graph without any connections
*/
public void add(BaseWork w) {
Added:
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1551076&view=auto
==============================================================================
---
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
(added)
+++
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
Sun Dec 15 23:07:32 2013
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTezTask {
+
+ DagUtils utils;
+ MapWork[] mws;
+ ReduceWork[] rws;
+ TezWork work;
+ TezTask task;
+ TezSession session;
+ TezSessionState sessionState;
+ JobConf conf;
+ LocalResource appLr;
+ Operator<?> op;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() throws Exception {
+ utils = mock(DagUtils.class);
+ when(utils.getTezDir(any(Path.class))).thenReturn(new
Path("hdfs://localhost:9000/tez/"));
+ when(utils.createVertex(any(JobConf.class), any(BaseWork.class),
any(Path.class), any(LocalResource.class),
+ any(List.class), any(FileSystem.class), any(Context.class),
anyBoolean())).thenAnswer(new Answer<Vertex>() {
+
+ @Override
+ public Vertex answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ return new Vertex(((BaseWork)args[1]).getName(),
+ mock(ProcessorDescriptor.class), 0, mock(Resource.class));
+ }
+ });
+
+ when(utils.createEdge(any(JobConf.class), any(Vertex.class),
any(JobConf.class),
+ any(Vertex.class), any(EdgeType.class))).thenAnswer(new Answer<Edge>()
{
+
+ @Override
+ public Edge answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ return new Edge((Vertex)args[1], (Vertex)args[3],
mock(EdgeProperty.class));
+ }
+ });
+
+ work = new TezWork();
+
+ mws = new MapWork[] { new MapWork(), new MapWork()};
+ rws = new ReduceWork[] { new ReduceWork(), new ReduceWork() };
+
+ work.addAll(mws);
+ work.addAll(rws);
+
+ int i = 0;
+ for (BaseWork w: work.getAllWork()) {
+ w.setName("Work "+(++i));
+ }
+
+ op = mock(Operator.class);
+
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> map
+ = new LinkedHashMap<String,Operator<? extends OperatorDesc>>();
+ map.put("foo", op);
+ mws[0].setAliasToWork(map);
+ mws[1].setAliasToWork(map);
+
+ LinkedHashMap<String, ArrayList<String>> pathMap
+ = new LinkedHashMap<String, ArrayList<String>>();
+ ArrayList<String> aliasList = new ArrayList<String>();
+ aliasList.add("foo");
+ pathMap.put("foo", aliasList);
+
+ mws[0].setPathToAliases(pathMap);
+ mws[1].setPathToAliases(pathMap);
+
+ rws[0].setReducer(op);
+ rws[1].setReducer(op);
+
+ work.connect(mws[0], rws[0], EdgeType.SIMPLE_EDGE);
+ work.connect(mws[1], rws[0], EdgeType.SIMPLE_EDGE);
+ work.connect(rws[0], rws[1], EdgeType.SIMPLE_EDGE);
+
+ task = new TezTask(utils);
+ task.setWork(work);
+ task.setConsole(mock(LogHelper.class));
+
+ conf = new JobConf();
+ appLr = mock(LocalResource.class);
+
+ session = mock(TezSession.class);
+ sessionState = mock(TezSessionState.class);
+ when(sessionState.getSession()).thenReturn(session);
+ when(session.submitDAG(any(DAG.class))).thenThrow(new
SessionNotRunning(""))
+ .thenReturn(mock(DAGClient.class));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ utils = null;
+ work = null;
+ task = null;
+ }
+
+ @Test
+ public void testBuildDag() throws IllegalArgumentException, IOException,
Exception {
+ DAG dag = task.build(conf, work, new Path("hdfs:///"), appLr, new
Context(conf));
+ for (BaseWork w: work.getAllWork()) {
+ Vertex v = dag.getVertex(w.getName());
+ assertNotNull(v);
+ List<Vertex> outs = v.getOutputVertices();
+ for (BaseWork x: work.getChildren(w)) {
+ boolean found = false;
+ for (Vertex u: outs) {
+ if (u.getVertexName().equals(x.getName())) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+ }
+ }
+
+ @Test
+ public void testEmptyWork() throws IllegalArgumentException, IOException,
Exception {
+ DAG dag = task.build(conf, new TezWork(), new Path("hdfs:///"), appLr, new
Context(conf));
+ assertEquals(dag.getVertices().size(), 0);
+ }
+
+ @Test
+ public void testSubmit() throws LoginException, IllegalArgumentException,
+ IOException, TezException, InterruptedException, URISyntaxException,
HiveException {
+ DAG dag = new DAG("test");
+ task.submit(conf, dag, new Path("hdfs:///"), appLr, sessionState);
+ // validate close/reopen
+ verify(sessionState, times(1)).open(any(String.class),
any(HiveConf.class));
+ verify(sessionState, times(1)).close(eq(true));
+ verify(session, times(2)).submitDAG(any(DAG.class));
+ }
+
+ @Test
+ public void testClose() throws HiveException {
+ task.close(work, 0);
+ verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
+ }
+}
Added:
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java?rev=1551076&view=auto
==============================================================================
---
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
(added)
+++
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
Sun Dec 15 23:07:32 2013
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for GenTezWork.
+ *
+ */
+public class TestGenTezWork {
+
+ GenTezProcContext ctx;
+ GenTezWork proc;
+ ReduceSinkOperator rs;
+ FileSinkOperator fs;
+ TableScanOperator ts;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() throws Exception {
+ ctx = new GenTezProcContext(
+ new HiveConf(),
+ new ParseContext(),
+ (List<Task<MoveWork>>)Collections.EMPTY_LIST,
+ (List<Task<? extends Serializable>>) new ArrayList<Task<? extends
Serializable>>(),
+ (Set<ReadEntity>)Collections.EMPTY_SET,
+ (Set<WriteEntity>)Collections.EMPTY_SET);
+
+ proc = new GenTezWork() {
+ @Override
+ protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
+ Operator<? extends OperatorDesc> root, String alias) throws
SemanticException {
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> map
+ = new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+ map.put("foo", root);
+ mapWork.setAliasToWork(map);
+ return;
+ }
+ };
+
+ fs = new FileSinkOperator();
+ fs.setConf(new FileSinkDesc());
+ rs = new ReduceSinkOperator();
+ rs.setConf(new ReduceSinkDesc());
+ ts = new TableScanOperator();
+ ts.setConf(new TableScanDesc());
+ ts.getChildOperators().add(rs);
+ rs.getParentOperators().add(ts);
+ rs.getChildOperators().add(fs);
+ fs.getParentOperators().add(rs);
+ ctx.preceedingWork = null;
+ ctx.currentRootOperator = ts;
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ ctx = null;
+ proc = null;
+ ts = null;
+ rs = null;
+ fs = null;
+ }
+
+ @Test
+ public void testCreateMap() throws SemanticException {
+ proc.process(rs, null, ctx, (Object[])null);
+
+ assertNotNull(ctx.currentTask);
+ assertTrue(ctx.rootTasks.contains(ctx.currentTask));
+
+ TezWork work = ctx.currentTask.getWork();
+ assertEquals(work.getAllWork().size(),1);
+
+ BaseWork w = work.getAllWork().get(0);
+ assertTrue(w instanceof MapWork);
+
+ MapWork mw = (MapWork)w;
+
+ // need to make sure names are set for tez to connect things right
+ assertNotNull(w.getName());
+
+ // map work should start with our ts op
+ assertSame(mw.getAliasToWork().entrySet().iterator().next().getValue(),ts);
+
+ // preceeding work must be set to the newly generated map
+ assertSame(ctx.preceedingWork, mw);
+
+ // should have a new root now
+ assertSame(ctx.currentRootOperator, fs);
+ }
+
+ @Test
+ public void testCreateReduce() throws SemanticException {
+ // create map
+ proc.process(rs, null, ctx, (Object[])null);
+
+ // create reduce
+ proc.process(fs, null, ctx, (Object[])null);
+
+ TezWork work = ctx.currentTask.getWork();
+ assertEquals(work.getAllWork().size(),2);
+
+ BaseWork w = work.getAllWork().get(1);
+ assertTrue(w instanceof ReduceWork);
+ assertTrue(work.getParents(w).contains(work.getAllWork().get(0)));
+
+ ReduceWork rw = (ReduceWork)w;
+
+ // need to make sure names are set for tez to connect things right
+ assertNotNull(w.getName());
+
+ // map work should start with our ts op
+ assertSame(rw.getReducer(),fs);
+
+ // should have severed the ties
+ assertEquals(fs.getParentOperators().size(),0);
+ }
+}