Re: Spark Streaming fails with unable to get records after polling for 512 ms

2017-11-15 Thread jagadish kagitala
Hi Cody,

It worked, after moving the parameter to sparkConf. I don't see that error.
But, Now i'm seeing the count for each RDD returns 0. But, there are
records in the topic i'm reading.

Do you see anything wrong with how i'm creating the Direct Stream ?

Thanks
Jagadish

On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger  wrote:

> spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
> a kafka parameter.
>
> see http://spark.apache.org/docs/latest/configuration.html
>
> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala  wrote:
> > Hi,
> >
> > I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> > this error
> > java.lang.AssertionError: assertion failed: Failed to get record after
> > polling for 512 ms.
> >
> > I tried to add different params like max.poll.interval.ms,
> > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams.
> > But, i still get failed to get records after 512ms. Not sure, even adding
> > the above params doesn't change the polling time.
> >
> > Without spark-streaming, i'm able to fetch the records. Only with
> > spark-streaming addon, i get this error.
> >
> > Any help is greatly appreciated. Below, is the code i'm using.
> >
> > SparkConf sparkConf = new
> > SparkConf().setAppName("JavaFlingerSparkApplication").
> setMaster("local[*]");
> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> > Durations.seconds(10));
> >
> > kafkaParams.put("bootstrap.servers", hosts);
> > kafkaParams.put("group.id", groupid);
> > kafkaParams.put("auto.commit.enable", false);
> > kafkaParams.put("key.deserializer", StringDeserializer.class);
> > kafkaParams.put("value.deserializer", BytesDeserializer.class);
> > kafkaParams.put("auto.offset.reset", "earliest");
> > //kafkaParams.put("max.poll.interval.ms", 12000);
> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> > //kafkaParams.put("request.timeout.ms", 12000);
> >
> >
> > JavaInputDStream>> messages =
> >   KafkaUtils.createDirectStream(ssc,
> > LocationStrategies.PreferConsistent(),
> >
> > ConsumerStrategies.Subscribe(topics, kafkaParams));
> > messages.foreachRDD(rdd -> {
> > List>> input =
> rdd.collect();
> > System.out.println("count is"+input.size());
> > });
> > ssc.start();
> > ssc.awaitTermination();
> >
> > Thanks
> > Jagadish
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Access to Applications metrics

2017-11-15 Thread Nick Dimiduk
Hello,

I'm wondering if it's possible to get access to the detailed job/stage/task
level metrics via the metrics system (JMX, Graphite, ). I've enabled the
wildcard sink and I do not see them. It seems these values are only
available over http/json and to SparkListener instances, is this the case?
Has anyone worked on a SparkListener that would bridge data from one to the
other?

Thanks,
Nick


[Spark Core]: S3a with Openstack swift object storage not using credentials provided in sparkConf

2017-11-15 Thread Marius

Hey,

i am currently using Spark 2.2.0 for Hadoop 2.7.x in in a Standalone 
cluster for testing. I want to Access some files to share them one the 
nodes on the cluster using addFiles. As local directories are not 
supported for this i want to use s3 to do the job.


In contrast to nearly everything i have found on the internet i am using 
a self hosted openstack cluster using swift as object storage. Accessing 
swift directly would be fine, too, but all tutorials i have found seem 
to use keystone v2, whilst our deployment uses the v3 version.


I added the following jars:

aws-java-sdk-1.7.4.jar

hadoop-aws-2.7.3.jar

as jars and to the classpath of each executor and driver.

When i try to access an s3 bucket the following exception occurs: 
"Unable to load AWS credentials from any provider in the chain"


This is my config:

conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("fs.s3a.endpoint", "https://foo/swift/v1;)
conf.set("fs.s3a.access.key", System.getenv("s3Access"))
conf.set("fs.s3a.secret.key", System.getenv("s3Secret"))

From my understanding the s3 handler is not using the provided credentials.

Has anyone an idea how to fix this?


Cheers and thanks in Advance

Marius



Parquet files from spark not readable in Cascading

2017-11-15 Thread Vikas Gandham
Hi,



When I  tried reading parquet data that was generated by spark in cascading
it throws following error







Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file ""

at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)

at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)

at
org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.(DeprecatedParquetInputFormat.java:103)

at
org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47)

at cascading.tap.hadoop.io
.MultiInputFormat$1.operate(MultiInputFormat.java:253)

at cascading.tap.hadoop.io
.MultiInputFormat$1.operate(MultiInputFormat.java:248)

at cascading.util.Util.retry(Util.java:1044)

at cascading.tap.hadoop.io
.MultiInputFormat.getRecordReader(MultiInputFormat.java:247)

at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)

at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1

at java.util.ArrayList.elementData(ArrayList.java:418)

at java.util.ArrayList.get(ArrayList.java:431)

at org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98)

at org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98)

at org.apache.parquet.io
.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:83)

at org.apache.parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:77)

at org.apache.parquet.io
.RecordReaderImplementation.(RecordReaderImplementation.java:293)

at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)

at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)

at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)

at org.apache.parquet.io
.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)

at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)

at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)



This is mostly seen when parquet has nested structures.



I didnt find any solution to this.



I see some JIRA issues like this
https://issues.apache.org/jira/browse/SPARK-10434 (parquet compatability
/interoperabilityissues) where reading parquet files in Spark 1.4 where the
files

were generated by Spark 1.5 .This was fixed in later versions but was it
fixed in Cascading?



Not sure if this is something to do with Parquet version or Cascading has a
bug or Spark is doing something with Parquet files

which cascading is not accepting



Note : I am trying to read Parquet with avro schema in Cascading



I have posted in Cascading mailing list too.





-- 
Thanks
Vikas Gandham


Re: Spark Streaming fails with unable to get records after polling for 512 ms

2017-11-15 Thread Cody Koeninger
spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
a kafka parameter.

see http://spark.apache.org/docs/latest/configuration.html

On Tue, Nov 14, 2017 at 8:56 PM, jkagitala  wrote:
> Hi,
>
> I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> this error
> java.lang.AssertionError: assertion failed: Failed to get record after
> polling for 512 ms.
>
> I tried to add different params like max.poll.interval.ms,
> spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams.
> But, i still get failed to get records after 512ms. Not sure, even adding
> the above params doesn't change the polling time.
>
> Without spark-streaming, i'm able to fetch the records. Only with
> spark-streaming addon, i get this error.
>
> Any help is greatly appreciated. Below, is the code i'm using.
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> kafkaParams.put("bootstrap.servers", hosts);
> kafkaParams.put("group.id", groupid);
> kafkaParams.put("auto.commit.enable", false);
> kafkaParams.put("key.deserializer", StringDeserializer.class);
> kafkaParams.put("value.deserializer", BytesDeserializer.class);
> kafkaParams.put("auto.offset.reset", "earliest");
> //kafkaParams.put("max.poll.interval.ms", 12000);
> //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> //kafkaParams.put("request.timeout.ms", 12000);
>
>
> JavaInputDStream>> messages =
>   KafkaUtils.createDirectStream(ssc,
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topics, kafkaParams));
> messages.foreachRDD(rdd -> {
> List>> input = 
> rdd.collect();
> System.out.println("count is"+input.size());
> });
> ssc.start();
> ssc.awaitTermination();
>
> Thanks
> Jagadish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Process large JSON file without causing OOM

2017-11-15 Thread Alec Swan
Thanks Steve and Vadim for the feedback.

@Steve, are you suggesting creating a custom receiver and somehow piping it
through Spark Streaming/Spark SQL? Or are you suggesting creating smaller
datasets from the stream and using my original code to process smaller
datasets? It'd be very helpful for a novice, like myself, if you could
provide code samples or links to docs/articles.

@Vadim, I ran my test with local[1] and got OOM in the same place. What
puzzles me is that when I expect the heap dump with VisualVM (see below) it
says that the heap is pretty small ~35MB. I am running my test with
"-Xmx10G -Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM opts and
I can see them reflected in Spark UI. Am I missing some memory settings?

Date taken: Wed Nov 15 10:46:06 MST 2017
File: /tmp/java_pid69786.hprof
File size: 59.5 MB

Total bytes: 39,728,337
Total classes: 15,749
Total instances: 437,979
Classloaders: 123
GC roots: 2,831
Number of objects pending for finalization: 5,198


Thanks,

Alec

On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov  wrote:

> There's a lot of off-heap memory involved in decompressing Snappy,
> compressing ZLib.
>
> Since you're running using `local[*]`, you process multiple tasks
> simultaneously, so they all might consume memory.
>
> I don't think that increasing heap will help, since it looks like you're
> hitting system memory limits.
>
> I'd suggest trying to run with `local[2]` and checking what's the memory
> usage of the jvm process.
>
> On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan  wrote:
>
>> Hello,
>>
>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>> format. Effectively, my Java service starts up an embedded Spark cluster
>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>> keep getting OOM errors with large (~1GB) files.
>>
>> I've tried different ways to reduce memory usage, e.g. by partitioning
>> data with dataSet.partitionBy("customer).save(filePath), or capping
>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>
>> I am wondering if there is a way to avoid OOM besides splitting the
>> source JSON file into multiple smaller ones and processing the small ones
>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>> in it's entirety before converting it to ORC (columnar)? If so, would it
>> make sense to create a custom receiver that reads the Snappy file and use
>> Spark streaming for ORC conversion?
>>
>> Thanks,
>>
>> Alec
>>
>
>


Restart Spark Streaming after deployment

2017-11-15 Thread KhajaAsmath Mohammed
Hi,

I am new in the usage of spark streaming. I have developed one spark
streaming job which runs every 30 minutes with checkpointing directory.

I have to implement minor change, shall I kill the spark streaming job once
the batch is completed using yarn application -kill command and update the
jar file?

Question I have is, if I follow the above approach will spark streaming
picks up data from offset saved in checkpoint after restart?

is there any other better approaches you have. Thanks in advance for your
suggestions.

Thanks,
Asmath


Re: Process large JSON file without causing OOM

2017-11-15 Thread Vadim Semenov
There's a lot of off-heap memory involved in decompressing Snappy,
compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks
simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're
hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory
usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan  wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping
> memory usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>


Re: Process large JSON file without causing OOM

2017-11-15 Thread Steve Loughran


On 14 Nov 2017, at 15:32, Alec Swan 
> wrote:

 But I wonder if there is a way to stream/batch the content of JSON file in 
order to convert it to ORC piecemeal and avoid reading the whole JSON file in 
memory in the first place?




That is what you'll need to do; you'd hit similar problems if you had the same 
files, same allocated JVM space and the same # of threads trying to read in the 
files.

Jackson has a streaming API: http://www.baeldung.com/jackson-streaming-api


spark strucured csv file stream not detecting new files

2017-11-15 Thread Imran Rajjad
Greetings,
I am running a unit test designed to stream a folder where I am manually
copying csv files. The files do not always get picked up. They only get
detected when the job starts with the files already in the folder.

I even tried using the option of fileNameOnly newly included in 2.2.0. Have
I missed something in the documentation. This problem does not seem to
occur in DStreams examples


DataStreamReader reader =  spark.readStream().option("fileNameOnly",
true).option("header",true)
.schema(userSchema);
  ;

DatasetcsvDF= reader.csv(watchDir)

Dataset results = csvDF.groupBy("myCol").count();
MyForEach forEachObj=new MyForEach();
query = results
.writeStream()
.foreach(forEachObj) // for each never gets called
.outputMode("complete")
.start();

-- 
I.R