HIVE-12466: SparkCounter not initialized error (Rui via Chengxiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/120df071 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/120df071 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/120df071 Branch: refs/heads/master Commit: 120df07186703dd2ecc930bbc5dfda191ad40773 Parents: 5cd4891 Author: chengxiang <[email protected]> Authored: Wed Nov 25 11:07:12 2015 +0800 Committer: Rui Li <[email protected]> Committed: Thu Jan 28 14:22:09 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/FileSinkOperator.java | 17 ++++++++++------- .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 14 +++++++++----- .../hadoop/hive/ql/exec/spark/SparkTask.java | 4 ++-- 3 files changed, 21 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 14121b6..0899793 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -441,13 +441,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - String suffix = Integer.toString(conf.getDestTableId()); - String fullName = conf.getTableInfo().getTableName(); - if (fullName != null) { - suffix = suffix + "_" + fullName.toLowerCase(); - } - - statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count); + statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count); } catch (HiveException e) { throw e; } catch (Exception e) { @@ -456,6 +450,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } + public String getCounterName(Counter counter) { + String suffix = Integer.toString(conf.getDestTableId()); + String fullName = conf.getTableInfo().getTableName(); + if (fullName != null) { + suffix = suffix + "_" + fullName.toLowerCase(); + } + return counter + "_" + suffix; + } + private void logOutputFormatError(Configuration hconf, HiveException ex) { StringWriter errorWriter = new StringWriter(); errorWriter.append("Failed to create output format; configuration: "); http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 74b4802..e692460 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -170,11 +170,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); - if (context != null && !context.isEmpty()) { - context = "_" + context.replace(" ","_"); - } - statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter); + statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter); List<ExprNodeDesc> keys = conf.getKeyCols(); @@ -256,6 +252,14 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> } } + public String getCounterName(Counter counter, Configuration hconf) { + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ", "_"); + } + return counter + context; + } + /** * Initializes array of ExprNodeEvaluator. Adds Union field for distinct http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index eb93aca..faa075a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -386,11 +386,11 @@ public class SparkTask extends Task<SparkWork> { for (Operator<? extends OperatorDesc> operator : work.getAllOperators()) { if (operator instanceof FileSinkOperator) { for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) { - hiveCounters.add(counter.toString()); + hiveCounters.add(((FileSinkOperator) operator).getCounterName(counter)); } } else if (operator instanceof ReduceSinkOperator) { for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) { - hiveCounters.add(counter.toString()); + hiveCounters.add(((ReduceSinkOperator) operator).getCounterName(counter, conf)); } } else if (operator instanceof ScriptOperator) { for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {
