http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java deleted file mode 100644 index 718791c..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java +++ /dev/null @@ -1,356 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common.jsonexplain.tez; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.hadoop.hive.common.jsonexplain.tez.Vertex.VertexType; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -public final class Op { - public final String name; - // tezJsonParser - public final TezJsonParser parser; - public final String operatorId; - public Op parent; - public final List<Op> children; - public final Map<String, String> attrs; - // the jsonObject for this operator - public final JSONObject opObject; - // the vertex that this operator belongs to - public final Vertex vertex; - // the vertex that this operator output to - public final String outputVertexName; - // the Operator type - public final OpType type; - - public enum OpType { - MAPJOIN, MERGEJOIN, RS, OTHERS - }; - - public Op(String name, String id, String outputVertexName, List<Op> children, - Map<String, String> attrs, JSONObject opObject, Vertex vertex, TezJsonParser tezJsonParser) - throws JSONException { - super(); - this.name = name; - this.operatorId = id; - this.type = deriveOpType(operatorId); - this.outputVertexName = outputVertexName; - this.children = children; - this.attrs = attrs; - this.opObject = opObject; - this.vertex = vertex; - this.parser = tezJsonParser; - } - - private OpType deriveOpType(String operatorId) { - if (operatorId != null) { - if (operatorId.startsWith(OpType.MAPJOIN.toString())) { - return OpType.MAPJOIN; - } else if (operatorId.startsWith(OpType.MERGEJOIN.toString())) { - return OpType.MERGEJOIN; - } else if (operatorId.startsWith(OpType.RS.toString())) { - return OpType.RS; - } else { - return OpType.OTHERS; - } - } else { - return OpType.OTHERS; - } - } - - private void inlineJoinOp() throws Exception { - // inline map join operator - if (this.type == OpType.MAPJOIN) { - JSONObject joinObj = opObject.getJSONObject(this.name); - // get the map for posToVertex - JSONObject verticeObj = joinObj.getJSONObject("input vertices:"); - Map<String, Vertex> posToVertex = new LinkedHashMap<>(); - for (String pos : JSONObject.getNames(verticeObj)) { - String vertexName = verticeObj.getString(pos); - // update the connection - Connection c = null; - for (Connection connection : vertex.parentConnections) { - if (connection.from.name.equals(vertexName)) { - posToVertex.put(pos, connection.from); - c = connection; - break; - } - } - if (c != null) { - parser.addInline(this, c); - } - } - // update the attrs - this.attrs.remove("input vertices:"); - // update the keys to use operator name - JSONObject keys = joinObj.getJSONObject("keys:"); - // find out the vertex for the big table - Set<Vertex> parentVertexes = new HashSet<>(); - for (Connection connection : vertex.parentConnections) { - parentVertexes.add(connection.from); - } - parentVertexes.removeAll(posToVertex.values()); - Map<String, String> posToOpId = new LinkedHashMap<>(); - if (keys.length() != 0) { - for (String key : JSONObject.getNames(keys)) { - // first search from the posToVertex - if (posToVertex.containsKey(key)) { - Vertex vertex = posToVertex.get(key); - if (vertex.rootOps.size() == 1) { - posToOpId.put(key, vertex.rootOps.get(0).operatorId); - } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) { - posToOpId.put(key, vertex.name); - } else { - Op singleRSOp = vertex.getSingleRSOp(); - if (singleRSOp != null) { - posToOpId.put(key, singleRSOp.operatorId); - } else { - throw new Exception( - "There are none or more than one root operators in a single vertex " - + vertex.name - + " when hive explain user is trying to identify the operator id."); - } - } - } - // then search from parent - else if (parent != null) { - posToOpId.put(key, parent.operatorId); - } - // then assume it is from its own vertex - else if (parentVertexes.size() == 1) { - Vertex vertex = parentVertexes.iterator().next(); - parentVertexes.clear(); - if (vertex.rootOps.size() == 1) { - posToOpId.put(key, vertex.rootOps.get(0).operatorId); - } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) { - posToOpId.put(key, vertex.name); - } else { - Op singleRSOp = vertex.getSingleRSOp(); - if (singleRSOp != null) { - posToOpId.put(key, singleRSOp.operatorId); - } else { - throw new Exception( - "There are none or more than one root operators in a single vertex " - + vertex.name - + " when hive explain user is trying to identify the operator id."); - } - } - } - // finally throw an exception - else { - throw new Exception( - "Can not find the source operator on one of the branches of map join."); - } - } - } - this.attrs.remove("keys:"); - StringBuffer sb = new StringBuffer(); - JSONArray conditionMap = joinObj.getJSONArray("condition map:"); - for (int index = 0; index < conditionMap.length(); index++) { - JSONObject cond = conditionMap.getJSONObject(index); - String k = (String) cond.keys().next(); - JSONObject condObject = new JSONObject((String)cond.get(k)); - String type = condObject.getString("type"); - String left = condObject.getString("left"); - String right = condObject.getString("right"); - if (keys.length() != 0) { - sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "." - + keys.get(right) + "(" + type + "),"); - } else { - // probably a cross product - sb.append("(" + type + "),"); - } - } - this.attrs.remove("condition map:"); - this.attrs.put("Conds:", sb.substring(0, sb.length() - 1)); - } - // should be merge join - else { - Map<String, String> posToOpId = new LinkedHashMap<>(); - if (vertex.mergeJoinDummyVertexs.size() == 0) { - if (vertex.tagToInput.size() != vertex.parentConnections.size()) { - throw new Exception("tagToInput size " + vertex.tagToInput.size() - + " is different from parentConnections size " + vertex.parentConnections.size()); - } - for (Entry<String, String> entry : vertex.tagToInput.entrySet()) { - Connection c = null; - for (Connection connection : vertex.parentConnections) { - if (connection.from.name.equals(entry.getValue())) { - Vertex v = connection.from; - if (v.rootOps.size() == 1) { - posToOpId.put(entry.getKey(), v.rootOps.get(0).operatorId); - } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) { - posToOpId.put(entry.getKey(), v.name); - } else { - Op singleRSOp = v.getSingleRSOp(); - if (singleRSOp != null) { - posToOpId.put(entry.getKey(), singleRSOp.operatorId); - } else { - throw new Exception( - "There are none or more than one root operators in a single vertex " + v.name - + " when hive explain user is trying to identify the operator id."); - } - } - c = connection; - break; - } - } - if (c == null) { - throw new Exception("Can not find " + entry.getValue() - + " while parsing keys of merge join operator"); - } - } - } else { - posToOpId.put(vertex.tag, this.parent.operatorId); - for (Vertex v : vertex.mergeJoinDummyVertexs) { - if (v.rootOps.size() != 1) { - throw new Exception("Can not find a single root operators in a single vertex " + v.name - + " when hive explain user is trying to identify the operator id."); - } - posToOpId.put(v.tag, v.rootOps.get(0).operatorId); - } - } - JSONObject joinObj = opObject.getJSONObject(this.name); - // update the keys to use operator name - JSONObject keys = joinObj.getJSONObject("keys:"); - if (keys.length() != 0) { - for (String key : JSONObject.getNames(keys)) { - if (!posToOpId.containsKey(key)) { - throw new Exception( - "Can not find the source operator on one of the branches of merge join."); - } - } - // inline merge join operator in a self-join - if (this.vertex != null) { - for (Vertex v : this.vertex.mergeJoinDummyVertexs) { - parser.addInline(this, new Connection(null, v)); - } - } - } - // update the attrs - this.attrs.remove("keys:"); - StringBuffer sb = new StringBuffer(); - JSONArray conditionMap = joinObj.getJSONArray("condition map:"); - for (int index = 0; index < conditionMap.length(); index++) { - JSONObject cond = conditionMap.getJSONObject(index); - String k = (String) cond.keys().next(); - JSONObject condObject = new JSONObject((String)cond.get(k)); - String type = condObject.getString("type"); - String left = condObject.getString("left"); - String right = condObject.getString("right"); - if (keys.length() != 0) { - sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "." - + keys.get(right) + "(" + type + "),"); - } else { - // probably a cross product - sb.append("(" + type + "),"); - } - } - this.attrs.remove("condition map:"); - this.attrs.put("Conds:", sb.substring(0, sb.length() - 1)); - } - } - - private String getNameWithOpIdStats() { - StringBuffer sb = new StringBuffer(); - sb.append(TezJsonParserUtils.renameReduceOutputOperator(name, vertex)); - if (operatorId != null) { - sb.append(" [" + operatorId + "]"); - } - if (!TezJsonParserUtils.OperatorNoStats.contains(name) && attrs.containsKey("Statistics:")) { - sb.append(" (" + attrs.get("Statistics:") + ")"); - } - attrs.remove("Statistics:"); - return sb.toString(); - } - - /** - * @param printer - * @param indentFlag - * @param branchOfJoinOp - * This parameter is used to show if it is a branch of a Join - * operator so that we can decide the corresponding indent. - * @throws Exception - */ - public void print(Printer printer, int indentFlag, boolean branchOfJoinOp) throws Exception { - // print name - if (parser.printSet.contains(this)) { - printer.println(TezJsonParser.prefixString(indentFlag) + " Please refer to the previous " - + this.getNameWithOpIdStats()); - return; - } - parser.printSet.add(this); - if (!branchOfJoinOp) { - printer.println(TezJsonParser.prefixString(indentFlag) + this.getNameWithOpIdStats()); - } else { - printer.println(TezJsonParser.prefixString(indentFlag, "<-") + this.getNameWithOpIdStats()); - } - branchOfJoinOp = false; - // if this operator is a Map Join Operator or a Merge Join Operator - if (this.type == OpType.MAPJOIN || this.type == OpType.MERGEJOIN) { - inlineJoinOp(); - branchOfJoinOp = true; - } - // if this operator is the last operator, we summarize the non-inlined - // vertex - List<Connection> noninlined = new ArrayList<>(); - if (this.parent == null) { - if (this.vertex != null) { - for (Connection connection : this.vertex.parentConnections) { - if (!parser.isInline(connection.from)) { - noninlined.add(connection); - } - } - } - } - // print attr - indentFlag++; - if (!attrs.isEmpty()) { - printer.println(TezJsonParser.prefixString(indentFlag) - + TezJsonParserUtils.attrsToString(attrs)); - } - // print inline vertex - if (parser.inlineMap.containsKey(this)) { - for (int index = 0; index < parser.inlineMap.get(this).size(); index++) { - Connection connection = parser.inlineMap.get(this).get(index); - connection.from.print(printer, indentFlag, connection.type, this.vertex); - } - } - // print parent op, i.e., where data comes from - if (this.parent != null) { - this.parent.print(printer, indentFlag, branchOfJoinOp); - } - // print next vertex - else { - for (int index = 0; index < noninlined.size(); index++) { - Vertex v = noninlined.get(index).from; - v.print(printer, indentFlag, noninlined.get(index).type, this.vertex); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java deleted file mode 100644 index d3c91d6..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common.jsonexplain.tez; - -public final class Printer { - public static final String lineSeparator = System.getProperty("line.separator");; - private final StringBuilder builder = new StringBuilder(); - - public void print(String string) { - builder.append(string); - } - - public void println(String string) { - builder.append(string); - builder.append(lineSeparator); - } - - public void println() { - builder.append(lineSeparator); - } - - public String toString() { - return builder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java deleted file mode 100644 index 63937f8..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common.jsonexplain.tez; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.jsonexplain.tez.Vertex.VertexType; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -public final class Stage { - //external name is used to show at the console - String externalName; - //internal name is used to track the stages - public final String internalName; - //tezJsonParser - public final TezJsonParser parser; - // upstream stages, e.g., root stage - public final List<Stage> parentStages = new ArrayList<>(); - // downstream stages. - public final List<Stage> childStages = new ArrayList<>(); - public final Map<String, Vertex> vertexs =new LinkedHashMap<>(); - public final Map<String, String> attrs = new TreeMap<>(); - Map<Vertex, List<Connection>> tezStageDependency; - // some stage may contain only a single operator, e.g., create table operator, - // fetch operator. - Op op; - - public Stage(String name, TezJsonParser tezJsonParser) { - super(); - internalName = name; - externalName = name; - parser = tezJsonParser; - } - - public void addDependency(JSONObject object, Map<String, Stage> stages) throws JSONException { - if (object.has("DEPENDENT STAGES")) { - String names = object.getString("DEPENDENT STAGES"); - for (String name : names.split(",")) { - Stage parent = stages.get(name.trim()); - this.parentStages.add(parent); - parent.childStages.add(this); - } - } - if (object.has("CONDITIONAL CHILD TASKS")) { - String names = object.getString("CONDITIONAL CHILD TASKS"); - this.externalName = this.internalName + "(CONDITIONAL CHILD TASKS: " + names + ")"; - for (String name : names.split(",")) { - Stage child = stages.get(name.trim()); - child.externalName = child.internalName + "(CONDITIONAL)"; - child.parentStages.add(this); - this.childStages.add(child); - } - } - } - - /** - * @param object - * @throws Exception - * If the object of stage contains "Tez", we need to extract the - * vertices and edges Else we need to directly extract operators - * and/or attributes. - */ - public void extractVertex(JSONObject object) throws Exception { - if (object.has("Tez")) { - this.tezStageDependency = new TreeMap<>(); - JSONObject tez = (JSONObject) object.get("Tez"); - JSONObject vertices = tez.getJSONObject("Vertices:"); - if (tez.has("Edges:")) { - JSONObject edges = tez.getJSONObject("Edges:"); - // iterate for the first time to get all the vertices - for (String to : JSONObject.getNames(edges)) { - vertexs.put(to, new Vertex(to, vertices.getJSONObject(to), parser)); - } - // iterate for the second time to get all the vertex dependency - for (String to : JSONObject.getNames(edges)) { - Object o = edges.get(to); - Vertex v = vertexs.get(to); - // 1 to 1 mapping - if (o instanceof JSONObject) { - JSONObject obj = (JSONObject) o; - String parent = obj.getString("parent"); - Vertex parentVertex = vertexs.get(parent); - if (parentVertex == null) { - parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser); - vertexs.put(parent, parentVertex); - } - String type = obj.getString("type"); - // for union vertex, we reverse the dependency relationship - if (!"CONTAINS".equals(type)) { - v.addDependency(new Connection(type, parentVertex)); - parentVertex.setType(type); - parentVertex.children.add(v); - } else { - parentVertex.addDependency(new Connection(type, v)); - v.children.add(parentVertex); - } - this.tezStageDependency.put(v, Arrays.asList(new Connection(type, parentVertex))); - } else { - // 1 to many mapping - JSONArray from = (JSONArray) o; - List<Connection> list = new ArrayList<>(); - for (int index = 0; index < from.length(); index++) { - JSONObject obj = from.getJSONObject(index); - String parent = obj.getString("parent"); - Vertex parentVertex = vertexs.get(parent); - if (parentVertex == null) { - parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser); - vertexs.put(parent, parentVertex); - } - String type = obj.getString("type"); - if (!"CONTAINS".equals(type)) { - v.addDependency(new Connection(type, parentVertex)); - parentVertex.setType(type); - parentVertex.children.add(v); - } else { - parentVertex.addDependency(new Connection(type, v)); - v.children.add(parentVertex); - } - list.add(new Connection(type, parentVertex)); - } - this.tezStageDependency.put(v, list); - } - } - } else { - for (String vertexName : JSONObject.getNames(vertices)) { - vertexs.put(vertexName, new Vertex(vertexName, vertices.getJSONObject(vertexName), parser)); - } - } - // The opTree in vertex is extracted - for (Vertex v : vertexs.values()) { - if (v.vertexType == VertexType.MAP || v.vertexType == VertexType.REDUCE) { - v.extractOpTree(); - v.checkMultiReduceOperator(); - } - } - } else { - String[] names = JSONObject.getNames(object); - if (names != null) { - for (String name : names) { - if (name.contains("Operator")) { - this.op = extractOp(name, object.getJSONObject(name)); - } else { - if (!object.get(name).toString().isEmpty()) { - attrs.put(name, object.get(name).toString()); - } - } - } - } - } - } - - /** - * @param opName - * @param opObj - * @return - * @throws Exception - * This method address the create table operator, fetch operator, - * etc - */ - Op extractOp(String opName, JSONObject opObj) throws Exception { - Map<String, String> attrs = new TreeMap<>(); - Vertex v = null; - if (opObj.length() > 0) { - String[] names = JSONObject.getNames(opObj); - for (String name : names) { - Object o = opObj.get(name); - if (isPrintable(o) && !o.toString().isEmpty()) { - attrs.put(name, o.toString()); - } else if (o instanceof JSONObject) { - JSONObject attrObj = (JSONObject) o; - if (attrObj.length() > 0) { - if (name.equals("Processor Tree:")) { - JSONObject object = new JSONObject(new LinkedHashMap<>()); - object.put(name, attrObj); - v = new Vertex(null, object, parser); - v.extractOpTree(); - } else { - for (String attrName : JSONObject.getNames(attrObj)) { - if (!attrObj.get(attrName).toString().isEmpty()) { - attrs.put(attrName, attrObj.get(attrName).toString()); - } - } - } - } - } else { - throw new Exception("Unsupported object in " + this.internalName); - } - } - } - Op op = new Op(opName, null, null, null, attrs, null, v, parser); - if (v != null) { - parser.addInline(op, new Connection(null, v)); - } - return op; - } - - private boolean isPrintable(Object val) { - if (val instanceof Boolean || val instanceof String || val instanceof Integer - || val instanceof Long || val instanceof Byte || val instanceof Float - || val instanceof Double || val instanceof Path) { - return true; - } - if (val != null && val.getClass().isPrimitive()) { - return true; - } - return false; - } - - public void print(Printer printer, int indentFlag) throws Exception { - // print stagename - if (parser.printSet.contains(this)) { - printer.println(TezJsonParser.prefixString(indentFlag) + " Please refer to the previous " - + externalName); - return; - } - parser.printSet.add(this); - printer.println(TezJsonParser.prefixString(indentFlag) + externalName); - // print vertexes - indentFlag++; - for (Vertex candidate : this.vertexs.values()) { - if (!parser.isInline(candidate) && candidate.children.isEmpty()) { - candidate.print(printer, indentFlag, null, null); - } - } - if (!attrs.isEmpty()) { - printer.println(TezJsonParser.prefixString(indentFlag) - + TezJsonParserUtils.attrsToString(attrs)); - } - if (op != null) { - op.print(printer, indentFlag, false); - } - indentFlag++; - // print dependent stages - for (Stage stage : this.parentStages) { - stage.print(printer, indentFlag); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java index ea86048..294dc6b 100644 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java @@ -18,146 +18,29 @@ package org.apache.hadoop.hive.common.jsonexplain.tez; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; +import org.apache.hadoop.hive.common.jsonexplain.DagJsonParser; -import org.apache.hadoop.hive.common.jsonexplain.JsonParser; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public final class TezJsonParser implements JsonParser { - public final Map<String, Stage> stages = new LinkedHashMap<>(); - protected final Logger LOG; - // the objects that have been printed. - public final Set<Object> printSet = new LinkedHashSet<>(); - // the vertex that should be inlined. <Operator, list of Vertex that is - // inlined> - public final Map<Op, List<Connection>> inlineMap = new LinkedHashMap<>(); - - public TezJsonParser() { - super(); - LOG = LoggerFactory.getLogger(this.getClass().getName()); - } - - public void extractStagesAndPlans(JSONObject inputObject) throws Exception { - // extract stages - JSONObject dependency = inputObject.getJSONObject("STAGE DEPENDENCIES"); - if (dependency != null && dependency.length() > 0) { - // iterate for the first time to get all the names of stages. - for (String stageName : JSONObject.getNames(dependency)) { - this.stages.put(stageName, new Stage(stageName, this)); - } - // iterate for the second time to get all the dependency. - for (String stageName : JSONObject.getNames(dependency)) { - JSONObject dependentStageNames = dependency.getJSONObject(stageName); - this.stages.get(stageName).addDependency(dependentStageNames, this.stages); - } - } - // extract stage plans - JSONObject stagePlans = inputObject.getJSONObject("STAGE PLANS"); - if (stagePlans != null && stagePlans.length() > 0) { - for (String stageName : JSONObject.getNames(stagePlans)) { - JSONObject stagePlan = stagePlans.getJSONObject(stageName); - this.stages.get(stageName).extractVertex(stagePlan); - } - } - } - - /** - * @param indentFlag - * help to generate correct indent - * @return - */ - public static String prefixString(int indentFlag) { - StringBuilder sb = new StringBuilder(); - for (int index = 0; index < indentFlag; index++) { - sb.append(" "); - } - return sb.toString(); - } - - /** - * @param indentFlag - * @param tail - * help to generate correct indent with a specific tail - * @return - */ - public static String prefixString(int indentFlag, String tail) { - StringBuilder sb = new StringBuilder(); - for (int index = 0; index < indentFlag; index++) { - sb.append(" "); - } - int len = sb.length(); - return sb.replace(len - tail.length(), len, tail).toString(); - } +public class TezJsonParser extends DagJsonParser { @Override - public void print(JSONObject inputObject, PrintStream outputStream) throws Exception { - LOG.info("JsonParser is parsing:" + inputObject.toString()); - this.extractStagesAndPlans(inputObject); - Printer printer = new Printer(); - // print out the cbo info - if (inputObject.has("cboInfo")) { - printer.println(inputObject.getString("cboInfo")); - printer.println(); - } - // print out the vertex dependency in root stage - for (Stage candidate : this.stages.values()) { - if (candidate.tezStageDependency != null && candidate.tezStageDependency.size() > 0) { - printer.println("Vertex dependency in root stage"); - for (Entry<Vertex, List<Connection>> entry : candidate.tezStageDependency.entrySet()) { - StringBuilder sb = new StringBuilder(); - sb.append(entry.getKey().name); - sb.append(" <- "); - boolean printcomma = false; - for (Connection connection : entry.getValue()) { - if (printcomma) { - sb.append(", "); - } else { - printcomma = true; - } - sb.append(connection.from.name + " (" + connection.type + ")"); - } - printer.println(sb.toString()); - } - printer.println(); - } + public String mapEdgeType(String edgeName) { + switch (edgeName) { + case "BROADCAST_EDGE": + return "BROADCAST"; + case "SIMPLE_EDGE": + return "SHUFFLE"; + case "CUSTOM_SIMPLE_EDGE": + return "PARTITION_ONLY_SHUFFLE"; + case "CUSTOM_EDGE": + return "MULTICAST"; + default: + return "UNKNOWN"; } - // print out all the stages that have no childStages. - for (Stage candidate : this.stages.values()) { - if (candidate.childStages.isEmpty()) { - candidate.print(printer, 0); - } - } - outputStream.println(printer.toString()); } - public void addInline(Op op, Connection connection) { - List<Connection> list = inlineMap.get(op); - if (list == null) { - list = new ArrayList<>(); - list.add(connection); - inlineMap.put(op, list); - } else { - list.add(connection); - } - } - - public boolean isInline(Vertex v) { - for (List<Connection> list : inlineMap.values()) { - for (Connection connection : list) { - if (connection.from.equals(v)) { - return true; - } - } - } - return false; + @Override + public String getFrameworkName() { + return "Tez"; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java deleted file mode 100644 index 363a422..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common.jsonexplain.tez; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - - -public class TezJsonParserUtils { - - public static List<String> OperatorNoStats = Arrays.asList(new String[] { "File Output Operator", - "Reduce Output Operator" }); - - public static String renameReduceOutputOperator(String operatorName, Vertex vertex) { - if (operatorName.equals("Reduce Output Operator") && vertex.edgeType != null) { - return vertex.edgeType.name(); - } else { - return operatorName; - } - } - - public static String attrsToString(Map<String, String> attrs) { - StringBuffer sb = new StringBuffer(); - boolean first = true; - for (Entry<String, String> entry : attrs.entrySet()) { - if (first) { - first = false; - } else { - sb.append(","); - } - sb.append(entry.getKey() + entry.getValue()); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java deleted file mode 100644 index 3d559bd..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java +++ /dev/null @@ -1,331 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common.jsonexplain.tez; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.hadoop.hive.common.jsonexplain.tez.Op.OpType; -import org.apache.hadoop.util.hash.Hash; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.JsonMappingException; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -public final class Vertex implements Comparable<Vertex>{ - public final String name; - //tezJsonParser - public final TezJsonParser parser; - // vertex's parent connections. - public final List<Connection> parentConnections = new ArrayList<>(); - // vertex's children vertex. - public final List<Vertex> children = new ArrayList<>(); - // the jsonObject for this vertex - public final JSONObject vertexObject; - // whether this vertex is dummy (which does not really exists but is created), - // e.g., a dummy vertex for a mergejoin branch - public boolean dummy; - // the rootOps in this vertex - public final List<Op> rootOps = new ArrayList<>(); - // we create a dummy vertex for a mergejoin branch for a self join if this - // vertex is a mergejoin - public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>(); - // whether this vertex has multiple reduce operators - public boolean hasMultiReduceOp = false; - // execution mode - public String executionMode = ""; - // tagToInput for reduce work - public Map<String, String> tagToInput = new LinkedHashMap<>(); - // tag - public String tag; - - public static enum VertexType { - MAP, REDUCE, UNION, UNKNOWN - }; - public VertexType vertexType; - - public static enum EdgeType { - BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN - }; - public EdgeType edgeType; - - public Vertex(String name, JSONObject vertexObject, TezJsonParser tezJsonParser) { - super(); - this.name = name; - if (this.name != null) { - if (this.name.contains("Map")) { - this.vertexType = VertexType.MAP; - } else if (this.name.contains("Reduce")) { - this.vertexType = VertexType.REDUCE; - } else if (this.name.contains("Union")) { - this.vertexType = VertexType.UNION; - } else { - this.vertexType = VertexType.UNKNOWN; - } - } else { - this.vertexType = VertexType.UNKNOWN; - } - this.dummy = false; - this.vertexObject = vertexObject; - parser = tezJsonParser; - } - - public void addDependency(Connection connection) throws JSONException { - this.parentConnections.add(connection); - } - - /** - * @throws JSONException - * @throws JsonParseException - * @throws JsonMappingException - * @throws IOException - * @throws Exception - * We assume that there is a single top-level Map Operator Tree or a - * Reduce Operator Tree in a vertex - */ - public void extractOpTree() throws JSONException, JsonParseException, JsonMappingException, - IOException, Exception { - if (vertexObject.length() != 0) { - for (String key : JSONObject.getNames(vertexObject)) { - if (key.equals("Map Operator Tree:")) { - extractOp(vertexObject.getJSONArray(key).getJSONObject(0)); - } else if (key.equals("Reduce Operator Tree:") || key.equals("Processor Tree:")) { - extractOp(vertexObject.getJSONObject(key)); - } else if (key.equals("Join:")) { - // this is the case when we have a map-side SMB join - // one input of the join is treated as a dummy vertex - JSONArray array = vertexObject.getJSONArray(key); - for (int index = 0; index < array.length(); index++) { - JSONObject mpOpTree = array.getJSONObject(index); - Vertex v = new Vertex(null, mpOpTree, parser); - v.extractOpTree(); - v.dummy = true; - mergeJoinDummyVertexs.add(v); - } - } else if (key.equals("Merge File Operator")) { - JSONObject opTree = vertexObject.getJSONObject(key); - if (opTree.has("Map Operator Tree:")) { - extractOp(opTree.getJSONArray("Map Operator Tree:").getJSONObject(0)); - } else { - throw new Exception("Merge File Operator does not have a Map Operator Tree"); - } - } else if (key.equals("Execution mode:")) { - executionMode = " " + vertexObject.getString(key); - } else if (key.equals("tagToInput:")) { - JSONObject tagToInput = vertexObject.getJSONObject(key); - for (String tag : JSONObject.getNames(tagToInput)) { - this.tagToInput.put(tag, (String) tagToInput.get(tag)); - } - } else if (key.equals("tag:")) { - this.tag = vertexObject.getString(key); - } else { - throw new Exception("Unsupported operator tree in vertex " + this.name); - } - } - } - } - - /** - * @param operator - * @param parent - * @return - * @throws JSONException - * @throws JsonParseException - * @throws JsonMappingException - * @throws IOException - * @throws Exception - * assumption: each operator only has one parent but may have many - * children - */ - Op extractOp(JSONObject operator) throws JSONException, JsonParseException, JsonMappingException, - IOException, Exception { - String[] names = JSONObject.getNames(operator); - if (names.length != 1) { - throw new Exception("Expect only one operator in " + operator.toString()); - } else { - String opName = names[0]; - JSONObject attrObj = (JSONObject) operator.get(opName); - Map<String, String> attrs = new TreeMap<>(); - List<Op> children = new ArrayList<>(); - String id = null; - String outputVertexName = null; - for (String attrName : JSONObject.getNames(attrObj)) { - if (attrName.equals("children")) { - Object childrenObj = attrObj.get(attrName); - if (childrenObj instanceof JSONObject) { - if (((JSONObject) childrenObj).length() != 0) { - children.add(extractOp((JSONObject) childrenObj)); - } - } else if (childrenObj instanceof JSONArray) { - if (((JSONArray) childrenObj).length() != 0) { - JSONArray array = ((JSONArray) childrenObj); - for (int index = 0; index < array.length(); index++) { - children.add(extractOp(array.getJSONObject(index))); - } - } - } else { - throw new Exception("Unsupported operator " + this.name - + "'s children operator is neither a jsonobject nor a jsonarray"); - } - } else { - if (attrName.equals("OperatorId:")) { - id = attrObj.get(attrName).toString(); - } else if (attrName.equals("outputname:")) { - outputVertexName = attrObj.get(attrName).toString(); - } else { - if (!attrObj.get(attrName).toString().isEmpty()) { - attrs.put(attrName, attrObj.get(attrName).toString()); - } - } - } - } - Op op = new Op(opName, id, outputVertexName, children, attrs, operator, this, parser); - if (!children.isEmpty()) { - for (Op child : children) { - child.parent = op; - } - } else { - this.rootOps.add(op); - } - return op; - } - } - - public void print(Printer printer, int indentFlag, String type, Vertex callingVertex) - throws JSONException, Exception { - // print vertexname - if (parser.printSet.contains(this) && !hasMultiReduceOp) { - if (type != null) { - printer.println(TezJsonParser.prefixString(indentFlag, "<-") - + " Please refer to the previous " + this.name + " [" + type + "]"); - } else { - printer.println(TezJsonParser.prefixString(indentFlag, "<-") - + " Please refer to the previous " + this.name); - } - return; - } - parser.printSet.add(this); - if (type != null) { - printer.println(TezJsonParser.prefixString(indentFlag, "<-") + this.name + " [" + type + "]" - + this.executionMode); - } else if (this.name != null) { - printer.println(TezJsonParser.prefixString(indentFlag) + this.name + this.executionMode); - } - // print operators - if (hasMultiReduceOp && !(callingVertex.vertexType == VertexType.UNION)) { - // find the right op - Op choose = null; - for (Op op : this.rootOps) { - if (op.outputVertexName.equals(callingVertex.name)) { - choose = op; - } - } - if (choose != null) { - choose.print(printer, indentFlag, false); - } else { - throw new Exception("Can not find the right reduce output operator for vertex " + this.name); - } - } else { - for (Op op : this.rootOps) { - // dummy vertex is treated as a branch of a join operator - if (this.dummy) { - op.print(printer, indentFlag, true); - } else { - op.print(printer, indentFlag, false); - } - } - } - if (vertexType == VertexType.UNION) { - // print dependent vertexs - indentFlag++; - for (int index = 0; index < this.parentConnections.size(); index++) { - Connection connection = this.parentConnections.get(index); - connection.from.print(printer, indentFlag, connection.type, this); - } - } - } - - /** - * We check if a vertex has multiple reduce operators. - */ - public void checkMultiReduceOperator() { - // check if it is a reduce vertex and its children is more than 1; - if (!this.name.contains("Reduce") || this.rootOps.size() < 2) { - return; - } - // check if all the child ops are reduce output operators - for (Op op : this.rootOps) { - if (op.type != OpType.RS) { - return; - } - } - this.hasMultiReduceOp = true; - } - - public void setType(String type) { - switch (type) { - case "BROADCAST_EDGE": - this.edgeType = EdgeType.BROADCAST; - break; - case "SIMPLE_EDGE": - this.edgeType = EdgeType.SHUFFLE; - break; - case "CUSTOM_SIMPLE_EDGE": - this.edgeType = EdgeType.PARTITION_ONLY_SHUFFLE; - break; - case "CUSTOM_EDGE": - this.edgeType = EdgeType.MULTICAST; - break; - default: - this.edgeType = EdgeType.UNKNOWN; - } - } - - //The following code should be gone after HIVE-11075 using topological order - @Override - public int compareTo(Vertex o) { - return this.name.compareTo(o.name); - } - - public Op getSingleRSOp() { - if (rootOps.size() == 0) { - return null; - } else { - Op ret = null; - for (Op op : rootOps) { - if (op.type == OpType.RS) { - if (ret == null) { - ret = op; - } else { - // find more than one RS Op - return null; - } - } - } - return ret; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java index 6db5c18..1e026a7 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.common.log; import com.google.common.base.Function; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java index ee02ccb..e7661b4 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.common.log; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java index e8abf6c..2d6c1b4 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java @@ -44,6 +44,8 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -190,22 +192,8 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co registerAll("threads", new ThreadStatesGaugeSet()); registerAll("classLoading", new ClassLoadingGaugeSet()); - //Metrics reporter - Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>(); - List<String> metricsReporterNames = Lists.newArrayList( - Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER))); - - if(metricsReporterNames != null) { - for (String metricsReportingName : metricsReporterNames) { - try { - MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase()); - finalReporterList.add(reporter); - } catch (IllegalArgumentException e) { - LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName); - } - } - } - initReporting(finalReporterList); + //initialize reporters + initReporting(); } @@ -385,107 +373,99 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co } /** - * Should be only called once to initialize the reporters + * Initializes reporters from HIVE_CODAHALE_METRICS_REPORTER_CLASSES or HIVE_METRICS_REPORTER if the former is not defined. + * Note: if both confs are defined, only HIVE_CODAHALE_METRICS_REPORTER_CLASSES will be used. */ - private void initReporting(Set<MetricsReporting> reportingSet) { - for (MetricsReporting reporting : reportingSet) { - switch(reporting) { - case CONSOLE: - final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - consoleReporter.start(1, TimeUnit.SECONDS); - reporters.add(consoleReporter); - break; - case JMX: - final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - jmxReporter.start(); - reporters.add(jmxReporter); - break; - case JSON_FILE: - final JsonFileReporter jsonFileReporter = new JsonFileReporter(); - jsonFileReporter.start(); - reporters.add(jsonFileReporter); - break; - case HADOOP2: - String applicationName = conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_COMPONENT_NAME.varname); - long reportingInterval = HiveConf.toTime( - conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_INTERVAL.varname), - TimeUnit.SECONDS, TimeUnit.SECONDS); - final HadoopMetrics2Reporter metrics2Reporter = HadoopMetrics2Reporter.forRegistry(metricRegistry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(DefaultMetricsSystem.initialize(applicationName), // The application-level name - applicationName, // Component name - applicationName, // Component description - "General"); // Name for each metric record - metrics2Reporter.start(reportingInterval, TimeUnit.SECONDS); - break; - } + private void initReporting() { + + if (!(initCodahaleMetricsReporterClasses() || initMetricsReporter())) { + LOGGER.warn("Unable to initialize metrics reporting"); + } + if (reporters.isEmpty()) { + // log a warning incase no reporters were successfully added + LOGGER.warn("No reporters configured for codahale metrics!"); } } - class JsonFileReporter implements Closeable { - private ObjectMapper jsonMapper = null; - private java.util.Timer timer = null; - - public void start() { - this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false)); - this.timer = new java.util.Timer(true); + /** + * Initializes reporting using HIVE_CODAHALE_METRICS_REPORTER_CLASSES. + * @return whether initialization was successful or not + */ + private boolean initCodahaleMetricsReporterClasses() { - long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS); - final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION); + List<String> reporterClasses = Lists.newArrayList(Splitter.on(",").trimResults(). + omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_CODAHALE_METRICS_REPORTER_CLASSES))); + if (reporterClasses.isEmpty()) { + return false; + } - timer.schedule(new TimerTask() { - @Override - public void run() { - BufferedWriter bw = null; - try { - String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); - Path tmpPath = new Path(pathString + ".tmp"); - URI tmpPathURI = tmpPath.toUri(); - FileSystem fs = null; - if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) { - //default local - fs = FileSystem.getLocal(conf); - } else { - fs = FileSystem.get(tmpPathURI, conf); - } - fs.delete(tmpPath, true); - bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true))); - bw.write(json); - bw.close(); - fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644)); - - Path path = new Path(pathString); - fs.rename(tmpPath, path); - fs.setPermission(path, FsPermission.createImmutable((short) 0644)); - } catch (Exception e) { - LOGGER.warn("Error writing JSON Metrics to file", e); - } finally { - try { - if (bw != null) { - bw.close(); - } - } catch (IOException e) { - //Ignore. - } - } + for (String reporterClass : reporterClasses) { + Class name = null; + try { + name = conf.getClassByName(reporterClass); + } catch (ClassNotFoundException e) { + LOGGER.error("Unable to instantiate metrics reporter class " + reporterClass + + " from conf HIVE_CODAHALE_METRICS_REPORTER_CLASSES", e); + throw new IllegalArgumentException(e); + } + try { + Constructor constructor = name.getConstructor(MetricRegistry.class, HiveConf.class); + CodahaleReporter reporter = (CodahaleReporter) constructor.newInstance(metricRegistry, conf); + reporter.start(); + reporters.add(reporter); + } catch (NoSuchMethodException | InstantiationException | + IllegalAccessException | InvocationTargetException e) { + LOGGER.error("Unable to instantiate using constructor(MetricRegistry, HiveConf) for" + + " reporter " + reporterClass + " from conf HIVE_CODAHALE_METRICS_REPORTER_CLASSES", + e); + throw new IllegalArgumentException(e); + } + } + return true; + } + /** + * Initializes reporting using HIVE_METRICS+REPORTER. + * @return whether initialization was successful or not + */ + private boolean initMetricsReporter() { - } - }, 0, time); + List<String> metricsReporterNames = Lists.newArrayList(Splitter.on(",").trimResults(). + omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER))); + if (metricsReporterNames.isEmpty()) { + return false; } - @Override - public void close() { - if (timer != null) { - this.timer.cancel(); + MetricsReporting reporter = null; + for (String metricsReportingName : metricsReporterNames) { + try { + reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase()); + } catch (IllegalArgumentException e) { + LOGGER.error("Invalid reporter name " + metricsReportingName, e); + throw e; + } + CodahaleReporter codahaleReporter = null; + switch (reporter) { + case CONSOLE: + codahaleReporter = new ConsoleMetricsReporter(metricRegistry, conf); + break; + case JMX: + codahaleReporter = new JmxMetricsReporter(metricRegistry, conf); + break; + case JSON_FILE: + codahaleReporter = new JsonFileMetricsReporter(metricRegistry, conf); + break; + case HADOOP2: + codahaleReporter = new Metrics2Reporter(metricRegistry, conf); + break; + default: + LOGGER.warn("Unhandled reporter " + reporter + " provided."); + } + if (codahaleReporter != null) { + codahaleReporter.start(); + reporters.add(codahaleReporter); } } + return true; } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java new file mode 100644 index 0000000..9424f28 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.Reporter; +import java.io.Closeable; + +public interface CodahaleReporter extends Closeable, Reporter { + + /** + * Start the reporter. + */ + public void start(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java new file mode 100644 index 0000000..dea1848 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; +import java.io.Closeable; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.conf.HiveConf; + + +/** + * A wrapper around Codahale ConsoleReporter to make it a pluggable/configurable Hive Metrics reporter. + */ +public class ConsoleMetricsReporter implements CodahaleReporter { + + private final ConsoleReporter reporter; + + public ConsoleMetricsReporter(MetricRegistry registry, HiveConf conf) { + + reporter = ConsoleReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + + } + + @Override + public void start() { + reporter.start(1, TimeUnit.SECONDS); + } + + @Override + public void close() { + reporter.close(); + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java new file mode 100644 index 0000000..f12adf9 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; +import java.io.Closeable; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * A wrapper around Codahale JmxReporter to make it a pluggable/configurable Hive Metrics reporter. + */ +public class JmxMetricsReporter implements CodahaleReporter { + + private final MetricRegistry registry; + private final HiveConf conf; + private final JmxReporter jmxReporter; + + public JmxMetricsReporter(MetricRegistry registry, HiveConf conf) { + this.registry = registry; + this.conf = conf; + + jmxReporter = JmxReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + } + + @Override + public void start() { + jmxReporter.start(); + } + + @Override + public void close() { + jmxReporter.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java new file mode 100644 index 0000000..c07517a --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.json.MetricsModule; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A metrics reporter for CodahaleMetrics that dumps metrics periodically into a file in JSON format. + */ + +public class JsonFileMetricsReporter implements CodahaleReporter { + + private final MetricRegistry metricRegistry; + private final ObjectWriter jsonWriter; + private final ScheduledExecutorService executorService; + private final HiveConf conf; + private final long interval; + private final String pathString; + private final Path path; + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonFileMetricsReporter.class); + + public JsonFileMetricsReporter(MetricRegistry registry, HiveConf conf) { + this.metricRegistry = registry; + this.jsonWriter = + new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, + TimeUnit.MILLISECONDS, false)).writerWithDefaultPrettyPrinter(); + executorService = Executors.newSingleThreadScheduledExecutor(); + this.conf = conf; + + interval = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS); + pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION); + path = new Path(pathString); + } + + @Override + public void start() { + + final Path tmpPath = new Path(pathString + ".tmp"); + URI tmpPathURI = tmpPath.toUri(); + final FileSystem fs; + try { + if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) { + //default local + fs = FileSystem.getLocal(conf); + } else { + fs = FileSystem.get(tmpPathURI, conf); + } + } + catch (IOException e) { + LOGGER.error("Unable to access filesystem for path " + tmpPath + ". Aborting reporting", e); + return; + } + + Runnable task = new Runnable() { + public void run() { + try { + String json = null; + try { + json = jsonWriter.writeValueAsString(metricRegistry); + } catch (JsonProcessingException e) { + LOGGER.error("Unable to convert json to string ", e); + return; + } + + BufferedWriter bw = null; + try { + fs.delete(tmpPath, true); + bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true))); + bw.write(json); + fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644)); + } catch (IOException e) { + LOGGER.error("Unable to write to temp file " + tmpPath, e); + return; + } finally { + if (bw != null) { + bw.close(); + } + } + + try { + fs.rename(tmpPath, path); + fs.setPermission(path, FsPermission.createImmutable((short) 0644)); + } catch (IOException e) { + LOGGER.error("Unable to rename temp file " + tmpPath + " to " + pathString, e); + return; + } + } catch (Throwable t) { + // catch all errors (throwable and execptions to prevent subsequent tasks from being suppressed) + LOGGER.error("Error executing scheduled task ", t); + } + } + }; + + executorService.scheduleWithFixedDelay(task,0, interval, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + executorService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java new file mode 100644 index 0000000..3b402d8 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.MetricRegistry; +import com.github.joshelser.dropwizard.metrics.hadoop.HadoopMetrics2Reporter; +import java.io.Closeable; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import com.codahale.metrics.Reporter; + +/** + * A wrapper around Codahale HadoopMetrics2Reporter to make it a pluggable/configurable Hive Metrics reporter. + */ +public class Metrics2Reporter implements CodahaleReporter { + + private final MetricRegistry metricRegistry; + private final HiveConf conf; + private final HadoopMetrics2Reporter reporter; + + public Metrics2Reporter(MetricRegistry registry, HiveConf conf) { + this.metricRegistry = registry; + this.conf = conf; + String applicationName = conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_COMPONENT_NAME.varname); + + reporter = HadoopMetrics2Reporter.forRegistry(metricRegistry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(DefaultMetricsSystem.initialize(applicationName), // The application-level name + applicationName, // Component name + applicationName, // Component description + "General"); // Name for each metric record + } + + @Override + public void start() { + long reportingInterval = + HiveConf.toTime(conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_INTERVAL.varname), TimeUnit.SECONDS, TimeUnit.SECONDS); + reporter.start(reportingInterval, TimeUnit.SECONDS); + } + + @Override + public void close() { + reporter.close(); + } +}
