HIVE-12515: Clean the SparkCounters related code after remove counter based
stats collection[Spark Branch] (Rui reviewed by Chengxiang & Xuefu)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72e070f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72e070f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72e070f4
Branch: refs/heads/master
Commit: 72e070f432db3eadc9b6d538157714cf8f47fca1
Parents: 4a41ba5
Author: Rui Li <[email protected]>
Authored: Thu Dec 3 16:37:05 2015 +0800
Committer: Rui Li <[email protected]>
Committed: Thu Jan 28 14:49:12 2016 +0800
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 4 -
.../hadoop/hive/ql/exec/spark/SparkTask.java | 146 +------------------
2 files changed, 1 insertion(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties
b/itests/src/test/resources/testconfiguration.properties
index 4df7d25..ec6a2c7 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1091,8 +1091,6 @@ spark.query.files=add_part_multiple.q, \
stats7.q, \
stats8.q, \
stats9.q, \
- stats_counter.q, \
- stats_counter_partitioned.q, \
stats_noscan_1.q, \
stats_noscan_2.q, \
stats_only_null.q, \
@@ -1298,8 +1296,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
schemeAuthority2.q,\
scriptfile1.q,\
scriptfile1_win.q,\
- stats_counter.q,\
- stats_counter_partitioned.q,\
temp_table_external.q,\
truncate_column_buckets.q,\
uber_reduce.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index faa075a..26cce1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,11 +28,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -44,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
@@ -58,25 +51,15 @@ import
org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.spark.counter.SparkCounters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@@ -86,7 +69,6 @@ public class SparkTask extends Task<SparkWork> {
private static final LogHelper console = new LogHelper(LOG);
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final long serialVersionUID = 1L;
- private SparkCounters sparkCounters;
@Override
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext
driverContext,
@@ -106,7 +88,7 @@ public class SparkTask extends Task<SparkWork> {
sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
SparkWork sparkWork = getWork();
- sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+ sparkWork.setRequiredCounterPrefix(getOperatorCounters());
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
@@ -116,8 +98,6 @@ public class SparkTask extends Task<SparkWork> {
rc = jobRef.monitorJob();
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
if (rc == 0) {
- sparkCounters = sparkJobStatus.getCounter();
- // for RSC, we should get the counters after job has finished
SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
LOG.info(String.format("=====Spark Job[%s] statistics=====",
jobRef.getJobId()));
@@ -233,10 +213,6 @@ public class SparkTask extends Task<SparkWork> {
return ((ReduceWork) children.get(0)).getReducer();
}
- public SparkCounters getSparkCounters() {
- return sparkCounters;
- }
-
/**
* Set the number of reducers for the spark work.
*/
@@ -250,126 +226,6 @@ public class SparkTask extends Task<SparkWork> {
console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS +
"=<number>");
}
- private Map<String, List<String>> getCounterPrefixes() throws HiveException,
MetaException {
- Map<String, List<String>> counters = getOperatorCounters();
- StatsTask statsTask = getStatsTaskInChildTasks(this);
- String statsImpl = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVESTATSDBCLASS);
- // fetch table prefix if SparkTask try to gather table statistics based on
counter.
- if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
- List<String> prefixes = getRequiredCounterPrefix(statsTask);
- for (String prefix : prefixes) {
- List<String> counterGroup = counters.get(prefix);
- if (counterGroup == null) {
- counterGroup = new LinkedList<String>();
- counters.put(prefix, counterGroup);
- }
- counterGroup.add(StatsSetupConst.ROW_COUNT);
- counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
- }
- }
- return counters;
- }
-
- private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws
HiveException, MetaException {
- List<String> prefixs = new LinkedList<String>();
- StatsWork statsWork = statsTask.getWork();
- String tablePrefix = getTablePrefix(statsWork);
- List<Map<String, String>> partitionSpecs = getPartitionSpecs(statsWork);
-
- if (partitionSpecs == null) {
- prefixs.add(tablePrefix.endsWith(Path.SEPARATOR) ? tablePrefix :
tablePrefix + Path.SEPARATOR);
- } else {
- for (Map<String, String> partitionSpec : partitionSpecs) {
- String prefixWithPartition = Utilities.join(tablePrefix,
Warehouse.makePartPath(partitionSpec));
- prefixs.add(prefixWithPartition.endsWith(Path.SEPARATOR) ?
prefixWithPartition : prefixWithPartition + Path.SEPARATOR);
- }
- }
-
- return prefixs;
- }
-
- private String getTablePrefix(StatsWork work) throws HiveException {
- String tableName;
- if (work.getLoadTableDesc() != null) {
- tableName = work.getLoadTableDesc().getTable().getTableName();
- } else if (work.getTableSpecs() != null) {
- tableName = work.getTableSpecs().tableName;
- } else {
- tableName = work.getLoadFileDesc().getDestinationCreateTable();
- }
- Table table;
- try {
- table = db.getTable(tableName);
- } catch (HiveException e) {
- LOG.warn("Failed to get table:" + tableName);
- // For CTAS query, table does not exist in this period, just use table
name as prefix.
- return tableName.toLowerCase();
- }
- return table.getDbName() + "." + table.getTableName();
- }
-
- private static StatsTask getStatsTaskInChildTasks(Task<? extends
Serializable> rootTask) {
-
- List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
- if (childTasks == null) {
- return null;
- }
- for (Task<? extends Serializable> task : childTasks) {
- if (task instanceof StatsTask) {
- return (StatsTask) task;
- } else {
- Task<? extends Serializable> childTask =
getStatsTaskInChildTasks(task);
- if (childTask instanceof StatsTask) {
- return (StatsTask) childTask;
- } else {
- continue;
- }
- }
- }
-
- return null;
- }
-
- private List<Map<String, String>> getPartitionSpecs(StatsWork work) throws
HiveException {
- if (work.getLoadFileDesc() != null) {
- return null; //we are in CTAS, so we know there are no partitions
- }
- Table table;
- List<Map<String, String>> partitionSpecs = new ArrayList<Map<String,
String>>();
-
- if (work.getTableSpecs() != null) {
-
- // ANALYZE command
- TableSpec tblSpec = work.getTableSpecs();
- table = tblSpec.tableHandle;
- if (!table.isPartitioned()) {
- return null;
- }
- // get all partitions that matches with the partition spec
- List<Partition> partitions = tblSpec.partitions;
- if (partitions != null) {
- for (Partition partition : partitions) {
- partitionSpecs.add(partition.getSpec());
- }
- }
- } else if (work.getLoadTableDesc() != null) {
-
- // INSERT OVERWRITE command
- LoadTableDesc tbd = work.getLoadTableDesc();
- table = db.getTable(tbd.getTable().getTableName());
- if (!table.isPartitioned()) {
- return null;
- }
- DynamicPartitionCtx dpCtx = tbd.getDPCtx();
- if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
- // we could not get dynamic partition information before SparkTask
execution.
- } else { // static partition
- partitionSpecs.add(tbd.getPartitionSpec());
- }
- }
- return partitionSpecs;
- }
-
private Map<String, List<String>> getOperatorCounters() {
String groupName = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVECOUNTERGROUP);
Map<String, List<String>> counters = new HashMap<String, List<String>>();