HIVE-12515: Clean the SparkCounters related code after remove counter based 
stats collection[Spark Branch] (Rui reviewed by Chengxiang & Xuefu)

Conflicts:
        ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72e070f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72e070f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72e070f4

Branch: refs/heads/master
Commit: 72e070f432db3eadc9b6d538157714cf8f47fca1
Parents: 4a41ba5
Author: Rui Li <[email protected]>
Authored: Thu Dec 3 16:37:05 2015 +0800
Committer: Rui Li <[email protected]>
Committed: Thu Jan 28 14:49:12 2016 +0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   4 -
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 146 +------------------
 2 files changed, 1 insertion(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 4df7d25..ec6a2c7 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1091,8 +1091,6 @@ spark.query.files=add_part_multiple.q, \
   stats7.q, \
   stats8.q, \
   stats9.q, \
-  stats_counter.q, \
-  stats_counter_partitioned.q, \
   stats_noscan_1.q, \
   stats_noscan_2.q, \
   stats_only_null.q, \
@@ -1298,8 +1296,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   schemeAuthority2.q,\
   scriptfile1.q,\
   scriptfile1_win.q,\
-  stats_counter.q,\
-  stats_counter_partitioned.q,\
   temp_table_external.q,\
   truncate_column_buckets.q,\
   uber_reduce.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index faa075a..26cce1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,11 +28,7 @@ import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -44,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.ScriptOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
@@ -58,25 +51,15 @@ import 
org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.spark.counter.SparkCounters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
@@ -86,7 +69,6 @@ public class SparkTask extends Task<SparkWork> {
   private static final LogHelper console = new LogHelper(LOG);
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private static final long serialVersionUID = 1L;
-  private SparkCounters sparkCounters;
 
   @Override
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext 
driverContext,
@@ -106,7 +88,7 @@ public class SparkTask extends Task<SparkWork> {
       sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
 
       SparkWork sparkWork = getWork();
-      sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+      sparkWork.setRequiredCounterPrefix(getOperatorCounters());
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
@@ -116,8 +98,6 @@ public class SparkTask extends Task<SparkWork> {
       rc = jobRef.monitorJob();
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
       if (rc == 0) {
-        sparkCounters = sparkJobStatus.getCounter();
-        // for RSC, we should get the counters after job has finished
         SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
         if (LOG.isInfoEnabled() && sparkStatistics != null) {
           LOG.info(String.format("=====Spark Job[%s] statistics=====", 
jobRef.getJobId()));
@@ -233,10 +213,6 @@ public class SparkTask extends Task<SparkWork> {
     return ((ReduceWork) children.get(0)).getReducer();
   }
 
-  public SparkCounters getSparkCounters() {
-    return sparkCounters;
-  }
-
   /**
    * Set the number of reducers for the spark work.
    */
@@ -250,126 +226,6 @@ public class SparkTask extends Task<SparkWork> {
     console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + 
"=<number>");
   }
 
-  private Map<String, List<String>> getCounterPrefixes() throws HiveException, 
MetaException {
-    Map<String, List<String>> counters = getOperatorCounters();
-    StatsTask statsTask = getStatsTaskInChildTasks(this);
-    String statsImpl = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVESTATSDBCLASS);
-    // fetch table prefix if SparkTask try to gather table statistics based on 
counter.
-    if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
-      List<String> prefixes = getRequiredCounterPrefix(statsTask);
-      for (String prefix : prefixes) {
-        List<String> counterGroup = counters.get(prefix);
-        if (counterGroup == null) {
-          counterGroup = new LinkedList<String>();
-          counters.put(prefix, counterGroup);
-        }
-        counterGroup.add(StatsSetupConst.ROW_COUNT);
-        counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
-      }
-    }
-    return counters;
-  }
-
-  private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws 
HiveException, MetaException {
-    List<String> prefixs = new LinkedList<String>();
-    StatsWork statsWork = statsTask.getWork();
-    String tablePrefix = getTablePrefix(statsWork);
-    List<Map<String, String>> partitionSpecs = getPartitionSpecs(statsWork);
-
-    if (partitionSpecs == null) {
-      prefixs.add(tablePrefix.endsWith(Path.SEPARATOR) ? tablePrefix : 
tablePrefix + Path.SEPARATOR);
-    } else {
-      for (Map<String, String> partitionSpec : partitionSpecs) {
-        String prefixWithPartition = Utilities.join(tablePrefix, 
Warehouse.makePartPath(partitionSpec));
-        prefixs.add(prefixWithPartition.endsWith(Path.SEPARATOR) ? 
prefixWithPartition : prefixWithPartition + Path.SEPARATOR);
-      }
-    }
-
-    return prefixs;
-  }
-
-  private String getTablePrefix(StatsWork work) throws HiveException {
-      String tableName;
-      if (work.getLoadTableDesc() != null) {
-        tableName = work.getLoadTableDesc().getTable().getTableName();
-      } else if (work.getTableSpecs() != null) {
-        tableName = work.getTableSpecs().tableName;
-      } else {
-        tableName = work.getLoadFileDesc().getDestinationCreateTable();
-      }
-    Table table;
-    try {
-      table = db.getTable(tableName);
-    } catch (HiveException e) {
-      LOG.warn("Failed to get table:" + tableName);
-      // For CTAS query, table does not exist in this period, just use table 
name as prefix.
-      return tableName.toLowerCase();
-    }
-    return table.getDbName() + "." + table.getTableName();
-  }
-
-  private static StatsTask getStatsTaskInChildTasks(Task<? extends 
Serializable> rootTask) {
-
-    List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
-    if (childTasks == null) {
-      return null;
-    }
-    for (Task<? extends Serializable> task : childTasks) {
-      if (task instanceof StatsTask) {
-        return (StatsTask) task;
-      } else {
-        Task<? extends Serializable> childTask = 
getStatsTaskInChildTasks(task);
-        if (childTask instanceof StatsTask) {
-          return (StatsTask) childTask;
-        } else {
-          continue;
-        }
-      }
-    }
-
-    return null;
-  }
-
-  private List<Map<String, String>> getPartitionSpecs(StatsWork work) throws 
HiveException {
-    if (work.getLoadFileDesc() != null) {
-      return null; //we are in CTAS, so we know there are no partitions
-    }
-    Table table;
-    List<Map<String, String>> partitionSpecs = new ArrayList<Map<String, 
String>>();
-
-    if (work.getTableSpecs() != null) {
-
-      // ANALYZE command
-      TableSpec tblSpec = work.getTableSpecs();
-      table = tblSpec.tableHandle;
-      if (!table.isPartitioned()) {
-        return null;
-      }
-      // get all partitions that matches with the partition spec
-      List<Partition> partitions = tblSpec.partitions;
-      if (partitions != null) {
-        for (Partition partition : partitions) {
-          partitionSpecs.add(partition.getSpec());
-        }
-      }
-    } else if (work.getLoadTableDesc() != null) {
-
-      // INSERT OVERWRITE command
-      LoadTableDesc tbd = work.getLoadTableDesc();
-      table = db.getTable(tbd.getTable().getTableName());
-      if (!table.isPartitioned()) {
-        return null;
-      }
-      DynamicPartitionCtx dpCtx = tbd.getDPCtx();
-      if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
-        // we could not get dynamic partition information before SparkTask 
execution.
-      } else { // static partition
-        partitionSpecs.add(tbd.getPartitionSpec());
-      }
-    }
-    return partitionSpecs;
-  }
-
   private Map<String, List<String>> getOperatorCounters() {
     String groupName = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVECOUNTERGROUP);
     Map<String, List<String>> counters = new HashMap<String, List<String>>();

Reply via email to