[
https://issues.apache.org/jira/browse/SPARK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045652#comment-14045652
]
Bharath Ravi Kumar edited comment on SPARK-2292 at 6/27/14 7:19 AM:
--------------------------------------------------------------------
My apologies, the cause was mapToPair and not reduceByKey. I was misled by line
number associated with the execution stage. Here's the entire (cleaned up) code
(copied from the updated gist) :
{code}
package com.acme;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.Tuple4;
public class Test {
@Parameter(required = true, names = "-sparkMaster", description = "The url
of the spark master in the form spark://host:port", arity = 1)
private String sparkMasterURL;
@Parameter(names = "-sparkHome", required = true, description = "Location
of the local spark installation directory", arity = 1)
private String sparkHome;
@Parameter(names = "-dataFile", required = true, description = "The local
file to pick up input data from", arity = 1)
private String dataFile;
@Parameter(names = "-numPartitions", required = false, description = "The
number of spark data partitions to split across", arity = 1)
private int numPartitions = 80;
@Parameter(names = "-jarFileLoc", required = true, description = "The
location of the jar file containing the application code with its non-spark
dependencies", arity = 1)
private String jarFileLoc;
private static final Logger logger = Logger.getLogger(Test.class);
public static void main(String[] args) throws Exception {
Test regressionDemo = new Test();
JCommander cmdParser = new JCommander(regressionDemo);
try {
cmdParser.parse(args);
regressionDemo.train();
} catch (ParameterException exception) {
System.err.println("Exception parsing one or more command line
arguments " + exception.getMessage());
cmdParser.usage();
}
}
public void train() throws Exception {
SparkConf conf = new SparkConf();
conf =
conf.setMaster(sparkMasterURL).setAppName("SparkRegressionExample").setSparkHome(sparkHome).setJars(new
String[]{jarFileLoc});
conf.set("spark.executor.memory", "1500M");
conf.set("spark.default.parallelism", "1");
conf.set("spark.cores.max", "20");
conf.set("spark.storage.memoryFraction", "0.9");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
LineSplitter splitter = new LineSplitter();
JavaRDD<Tuple4<Long, Long, Integer, Integer>> parsedRecords =
rawData.map(splitter);
logger.info("Raw records size: " + parsedRecords.count());
JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords =
parsedRecords.coalesce(numPartitions);
logger.info("Filtered records size: " + reducedTupleRecords.count());
TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>>
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
logger.info("Size of records reduced by tuple: " +
recordsByTuple.count());
logger.info("Shutting down context...");
ctx.stop();
}
private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File
file) {
JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
return rawData;
}
private static class TupleToPairMapper implements PairFunction<Tuple4<Long,
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(),
t._4()));
}
public TupleToPairMapper() {
}
}
private static class KVReducer implements Function2<Tuple2<Integer,
Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1,
Tuple2<Integer, Integer> t2) throws Exception {
return new Tuple2<Integer, Integer>(t1._1() + t2._1(), t1._2() +
t2._2());
}
public KVReducer() {
}
}
private static class LineSplitter implements Function<String, Tuple4<Long,
Long, Integer, Integer>> {
@Override
public Tuple4<Long, Long, Integer, Integer> call(String line) throws
Exception {
String[] dataPoint = line.trim().split("\\t");
String f = dataPoint[0];
String l1 = dataPoint[1];
String[] fArr = f.split("#");
Long x = Long.valueOf(fArr[0].trim());
Long y = Long.valueOf(fArr[1].trim());
String[] lArr = l1.split("#");
int c = Integer.valueOf(lArr[2].trim());
int r = Integer.valueOf(lArr[1].trim());
Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long,
Integer, Integer>(x, y, r, c);
return tuple;
}
}
}
{code}
Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774 1#1#0
{noformat}
was (Author: reachbach):
My apologies, the cause was mapToPair and not reduceByKey. I was misled by line
number associated with the execution stage. Here's the entire (cleaned up) code
(copied from the updated gist) :
{code}
package com.acme;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.Tuple4;
public class Test {
@Parameter(required = true, names = "-sparkMaster", description = "The url of
the spark master in the form spark://host:port", arity = 1)
private String sparkMasterURL;
@Parameter(names = "-sparkHome", required = true, description = "Location of
the local spark installation directory", arity = 1)
private String sparkHome;
@Parameter(names = "-dataFile", required = true, description = "The local file
to pick up input data from", arity = 1)
private String dataFile;
@Parameter(names = "-numPartitions", required = false, description = "The
number of spark data partitions to split across", arity = 1)
private int numPartitions = 80;
@Parameter(names = "-jarFileLoc", required = true, description = "The location
of the jar file containing the application code with its non-spark
dependencies", arity = 1)
private String jarFileLoc;
private static final Logger logger = Logger.getLogger(Test.class);
public static void main(String[] args) throws Exception {
Test regressionDemo = new Test();
JCommander cmdParser = new JCommander(regressionDemo);
try {
cmdParser.parse(args);
regressionDemo.train();
} catch (ParameterException exception) {
System.err.println("Exception parsing one or more command line arguments " +
exception.getMessage());
cmdParser.usage();
}
}
public void train() throws Exception {
SparkConf conf = new SparkConf();
conf =
conf.setMaster(sparkMasterURL).setAppName("SparkRegressionExample").setSparkHome(sparkHome).setJars(new
String[]{jarFileLoc});
conf.set("spark.executor.memory", "1500M");
conf.set("spark.default.parallelism", "1");
conf.set("spark.cores.max", "20");
conf.set("spark.storage.memoryFraction", "0.9");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
LineSplitter splitter = new LineSplitter();
JavaRDD<Tuple4<Long, Long, Integer, Integer>> parsedRecords =
rawData.map(splitter);
logger.info("Raw records size: " + parsedRecords.count());
JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords =
parsedRecords.coalesce(numPartitions);
logger.info("Filtered records size: " + reducedTupleRecords.count());
TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>> recordsByTuple =
reducedTupleRecords.mapToPair(tupleToPairMapper);
logger.info("Size of records reduced by tuple: " + recordsByTuple.count());
logger.info("Shutting down context...");
ctx.stop();
}
private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File file) {
JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
return rawData;
}
private static class TupleToPairMapper implements PairFunction<Tuple4<Long,
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>> call(Tuple4<Long,
Long, Integer, Integer> t) throws Exception {
return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(),
t._4()));
}
public TupleToPairMapper() {
}
}
private static class KVReducer implements Function2<Tuple2<Integer, Integer>,
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1,
Tuple2<Integer, Integer> t2) throws Exception {
return new Tuple2<Integer, Integer>(t1._1() + t2._1(), t1._2() + t2._2());
}
public KVReducer() {
}
}
private static class LineSplitter implements Function<String, Tuple4<Long,
Long, Integer, Integer>> {
@Override
public Tuple4<Long, Long, Integer, Integer> call(String line) throws Exception {
String[] dataPoint = line.trim().split("\\t");
String f = dataPoint[0];
String l1 = dataPoint[1];
String[] fArr = f.split("#");
Long x = Long.valueOf(fArr[0].trim());
Long y = Long.valueOf(fArr[1].trim());
String[] lArr = l1.split("#");
int c = Integer.valueOf(lArr[2].trim());
int r = Integer.valueOf(lArr[1].trim());
Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long, Integer,
Integer>(x, y, r, c);
return tuple;
}
}
}
{code}
Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774 1#1#0
{noformat}
> NullPointerException in JavaPairRDD.mapToPair
> ---------------------------------------------
>
> Key: SPARK-2292
> URL: https://issues.apache.org/jira/browse/SPARK-2292
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.0.0
> Environment: Spark 1.0.0, Standalone with a single slave running with
> 4G memory allocation. The data file was
> Reporter: Bharath Ravi Kumar
> Priority: Critical
>
> Correction: Invoking JavaPairRDD.mapToPair results in an NPE:
> {noformat}
> 14/06/26 21:05:35 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.NullPointerException
> java.lang.NullPointerException
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> {noformat}
> This occurs only after migrating to the 1.0.0 API. The details of the code
> the data file used to test are included in this gist :
> https://gist.github.com/reachbach/d8977c8eb5f71f889301
--
This message was sent by Atlassian JIRA
(v6.2#6252)