Author: xuefu
Date: Sat Oct 31 02:33:28 2015
New Revision: 1711569
URL: http://svn.apache.org/viewvc?rev=1711569&view=rev
Log:
PIG-4634: Fix records count issues in output statistics (Xianda via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Sat Oct 31 02:33:28 2015
@@ -109,7 +109,9 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@@ -149,7 +151,7 @@ public class SparkLauncher extends Launc
explain(sparkplan, System.out, "text", true);
SparkPigStats sparkStats = (SparkPigStats) pigContext
.getExecutionEngine().instantiatePigStats();
- sparkStats.initialize(sparkplan);
+ sparkStats.initialize(pigContext, sparkplan);
PigStats.start(sparkStats);
startSparkIfNeeded(pigContext);
@@ -178,6 +180,8 @@ public class SparkLauncher extends Launc
byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+ SparkPigStatusReporter.getInstance().setCounters(new
SparkCounters(sparkContext));
+
// Create conversion map, mapping between pig operator and spark
convertor
Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
= new HashMap<Class<? extends PhysicalOperator>,
RDDConverter>();
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Sat Oct 31 02:33:28 2015
@@ -21,6 +21,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import scala.Tuple2;
import org.apache.commons.logging.Log;
@@ -68,9 +72,13 @@ public class StoreConverter implements
POStore op) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, op, 1);
RDD<Tuple> rdd = predecessors.get(0);
+
+
SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP,
+ SparkStatsUtil.getStoreSparkCounterName(op));
+
// convert back to KV pairs
JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
- new FromTupleFunction());
+ buildFromTupleFunction(op));
PairRDDFunctions<Text, Tuple> pairRDDFunctions = new
PairRDDFunctions<Text, Tuple>(
rddPairs.rdd(), SparkUtil.getManifest(Text.class),
@@ -103,6 +111,7 @@ public class StoreConverter implements
return retRdd;
}
+
private static POStore configureStorer(JobConf jobConf,
PhysicalOperator op) throws IOException {
ArrayList<POStore> storeLocations = Lists.newArrayList();
@@ -125,9 +134,39 @@ public class StoreConverter implements
Function<Tuple, Tuple2<Text, Tuple>> {
private static Text EMPTY_TEXT = new Text();
+ private String counterGroupName;
+ private String counterName;
+ private SparkCounters sparkCounters;
+
public Tuple2<Text, Tuple> call(Tuple v1) {
+ if (sparkCounters != null) {
+ sparkCounters.increment(counterGroupName, counterName, 1L);
+ }
return new Tuple2<Text, Tuple>(EMPTY_TEXT, v1);
}
+
+ public void setCounterGroupName(String counterGroupName) {
+ this.counterGroupName = counterGroupName;
+ }
+
+ public void setCounterName(String counterName) {
+ this.counterName = counterName;
+ }
+
+ public void setSparkCounters(SparkCounters sparkCounter) {
+ this.sparkCounters = sparkCounter;
+ }
+ }
+
+ private FromTupleFunction buildFromTupleFunction(POStore op) {
+ FromTupleFunction ftf = new FromTupleFunction();
+ if (!op.isTmpStore()) {
+ ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
+ ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op));
+ SparkPigStatusReporter counterReporter =
SparkPigStatusReporter.getInstance();
+ ftf.setSparkCounters(counterReporter.getCounters());
+ }
+ return ftf;
}
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
(original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Sat
Oct 31 02:33:28 2015
@@ -160,7 +160,7 @@ public class PigStatsUtil {
private static final String SEPARATOR = "/";
private static final String SEMICOLON = ";";
- private static String getShortName(String uri) {
+ public static String getShortName(String uri) {
int scolon = uri.indexOf(SEMICOLON);
int slash;
if (scolon!=-1) {
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
Sat Oct 31 02:33:28 2015
@@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp
import java.util.List;
import java.util.Map;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import scala.Option;
import com.google.common.collect.Maps;
@@ -57,17 +58,19 @@ public class SparkJobStats extends JobSt
public void addOutputInfo(POStore poStore, boolean success,
JobMetricsListener jobMetricsListener,
Configuration conf) {
- // TODO: Compute #records
- long bytes = getOutputSize(poStore, conf);
- OutputStats outputStats = new
OutputStats(poStore.getSFile().getFileName(),
- bytes, 1, success);
- outputStats.setPOStore(poStore);
- outputStats.setConf(conf);
if (!poStore.isTmpStore()) {
+ long bytes = getOutputSize(poStore, conf);
+ long recordsCount =
SparkStatsUtil.getStoreSparkCounterValue(poStore);
+ OutputStats outputStats = new
OutputStats(poStore.getSFile().getFileName(),
+ bytes, recordsCount, success);
+ outputStats.setPOStore(poStore);
+ outputStats.setConf(conf);
+
outputs.add(outputStats);
}
}
+
public void collectStats(JobMetricsListener jobMetricsListener) {
if (jobMetricsListener != null) {
Map<String, List<TaskMetrics>> taskMetrics =
jobMetricsListener.getJobMetric(jobId);
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
Sat Oct 31 02:33:28 2015
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -49,8 +50,9 @@ public class SparkPigStats extends PigSt
this.sparkScriptState = (SparkScriptState) ScriptState.get();
}
- public void initialize(SparkOperPlan sparkPlan){
+ public void initialize(PigContext pigContext, SparkOperPlan sparkPlan){
super.start();
+ this.pigContext = pigContext;
sparkScriptState.setScriptInfo(sparkPlan);
}
@@ -133,17 +135,6 @@ public class SparkPigStats extends PigSt
}
@Override
- public Properties getPigProperties() {
- return null;
- }
-
- @Override
- public String getOutputAlias(String location) {
- // TODO
- return null;
- }
-
- @Override
public long getSMMSpillCount() {
throw new UnsupportedOperationException();
}
@@ -159,18 +150,6 @@ public class SparkPigStats extends PigSt
}
@Override
- public long getBytesWritten() {
- // TODO
- return 0;
- }
-
- @Override
- public long getRecordWritten() {
- // TODO
- return 0;
- }
-
- @Override
public int getNumberJobs() {
return jobPlan.size();
}
Added:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java?rev=1711569&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
(added)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
Sat Oct 31 02:33:28 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
+
+/**
+ * Just like PigStatusReporter which will create/reset Hadoop counters,
SparkPigStatusReporter will
+ * create/reset Spark counters.
+ * Note that, it is not suitable to make SparkCounters as a Singleton, it will
be created/reset for
+ * a given pig script or a Dump/Store action in Grunt mode.
+ */
+public class SparkPigStatusReporter {
+ private static SparkPigStatusReporter reporter;
+ private SparkCounters counters;
+
+ static {
+
JVMReuseManager.getInstance().registerForStaticDataCleanup(SparkPigStatusReporter.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ reporter = null;
+ }
+
+ private SparkPigStatusReporter() {
+ }
+
+ public static SparkPigStatusReporter getInstance() {
+ if (reporter == null) {
+ reporter = new SparkPigStatusReporter();
+ }
+ return reporter;
+ }
+
+ public void createCounter(String groupName, String counterName) {
+ if (counters != null) {
+ counters.createCounter(groupName, counterName);
+ }
+ }
+
+ public SparkCounters getCounters() {
+ return counters;
+ }
+
+ public void setCounters(SparkCounters counters) {
+ this.counters = counters;
+ }
+}
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Sat Oct 31 02:33:28 2015
@@ -22,12 +22,16 @@ import org.apache.hadoop.mapred.JobConf;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaSparkContext;
public class SparkStatsUtil {
+ public static final String SPARK_STORE_COUNTER_GROUP =
"SparkStoreCounters";
+ public static final String SPARK_STORE_RECORD_COUNTER = "Output records in
";
+
public static void waitForJobAddStats(int jobID,
POStore poStore, SparkOperator
sparkOperator,
JobMetricsListener jobMetricsListener,
@@ -58,7 +62,25 @@ public class SparkStatsUtil {
sparkContext, jobConf, e);
}
- public static boolean isJobSuccess(int jobID,
+ public static String getStoreSparkCounterName(POStore store) {
+ String shortName =
PigStatsUtil.getShortName(store.getSFile().getFileName());
+
+ StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER);
+ sb.append("_");
+ sb.append(store.getIndex());
+ sb.append("_");
+ sb.append(store.getOperatorKey());
+ sb.append("_");
+ sb.append(shortName);
+ return sb.toString();
+ }
+
+ public static long getStoreSparkCounterValue(POStore store) {
+ SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
+ return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP,
getStoreSparkCounterName(store));
+ }
+
+ public static boolean isJobSuccess(int jobID,
JavaSparkContext sparkContext) {
JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
if (status == JobExecutionStatus.SUCCEEDED) {