Scala version changed in spark job

2018-01-24 Thread Fawze Abujaber
Hi all,

I upgraded my Hadoop cluster which include spark 1.6.0, I noticed that
sometimes the job is running with scala version 2.10.5 and sometimes with
2.10.4, any idea why this happening?


Re: a way to allow spark job to continue despite task failures?

2018-01-24 Thread Sunita Arvind
Had a similar situation and landed on this question.
Finally I was able to make it do what I needed by cheating the spark driver
:)
i.e By setting a very high value for "--conf spark.task.maxFailures=800".
I made it 800 deliberately which typically is 4. So by the time 800
attempts for failed tasks were done, other tasks completed.
You can set it to higher or lower value depending on how many more tasks
you have and the duration they take to complete.

regards
Sunita

On Fri, Nov 13, 2015 at 4:50 PM, Ted Yu  wrote:

> I searched the code base and looked at:
> https://spark.apache.org/docs/latest/running-on-yarn.html
>
> I didn't find mapred.max.map.failures.percent or its counterpart.
>
> FYI
>
> On Fri, Nov 13, 2015 at 9:05 AM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>> Hi,
>>
>>
>> I know a task can fail 2 times and only the 3rd breaks the entire job.
>>
>> I am good with this number of attempts.
>>
>> I would like that after trying a task 3 times, it continues with the
>> other tasks.
>>
>> The job can be "failed", but I want all tasks run.
>>
>> Please see my use case.
>>
>>
>> I read a hadoop input set, and some gzip files are incomplete. I would
>> like to just skip them and the only way I see is to tell Spark to ignore
>> some tasks permanently failing, if it is possible. With traditional hadoop
>> map-reduce this was possible using mapred.max.map.failures.percent.
>>
>>
>> Do map-reduce params like mapred.max.map.failures.percent apply to
>> Spark/YARN map-reduce jobs ?
>>
>> I edited $HADOOP_CONF_DIR/mapred-site.xml and
>> added mapred.max.map.failures.percent=30 but does not seem to apply, job
>> still failed after 3 task attempt fails.
>>
>>
>> Should Spark transmit this parameter? Or the mapred.* do not apply?
>>
>> Do other hadoop parameters (e.g. the ones involved in the input reading,
>> not in the "processing" or "application" like this max.map.failures) - are
>> others taken into account and transmitted? I saw that it should scan
>> HADOOP_CONF_DIR and forward those, but I guess this does not apply to any
>> parameter, since Spark has its own distribution & DAG stages processing
>> logic, which just happens to have a YARN implementation.
>>
>>
>> Do you know a way to do this in Spark - to ignore a predefined number of
>> tasks fail, but allow the job to continue? This way I could see all the
>> faulty input files in one job run, delete them all and continue with the
>> rest.
>>
>>
>> Just to mention, doing a manual gzip -t on top of hadoop cat is
>> infeasible and map-reduce is way faster to scan the 15K files worth 70GB
>> (its doing 25M/s per node), while the old style hadoop cat is doing much
>> less.
>>
>>
>> Thanks,
>>
>> Nicu
>>
>
>


CI/CD for spark and scala

2018-01-24 Thread Deepak Sharma
Hi All,
I just wanted to check if there are any best practises around using CI/CD
for spark /  scala projects running on AWS hadoop clusters.
IF there is any specific tools , please do let me know.

-- 
Thanks
Deepak


Re: Providing Kafka configuration as Map of Strings

2018-01-24 Thread Cody Koeninger
Have you tried passing in a Map that happens to have
string for all the values?  I haven't tested this, but the underlying
kafka consumer constructor is documented to take either strings or
objects as values, despite the static type.

On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain
 wrote:
> Basically, I am trying to avoid writing code like:
>
>   switch( key ) {
> case "key.deserializer" :  result.put(key ,
> Class.forName(value)); break;
> case "key.serializer"   :  result.put(key ,
> Class.forName(value)); break;
> case "value.deserializer" :  result.put(key ,
> Class.forName(value)); break;
> case "value.serializer"   :  result.put(key ,
> Class.forName(value)); break;
> case "max.partition.fetch.bytes" : result.put(key,
> Long.valueOf(value)); break;
> case "max.poll.interval.ms" : result.put(key,
> Long.valueOf(value)); break;
> case "enable.auto.commit" : result.put(key,
> Boolean.valueOf(value)); break;
> default:
> result.put(key, value);
> break;
> }
>
> since I would need to go over all possible Kafka properties that are not
> expected as a String.
>
> On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain 
> wrote:
>>
>> On page
>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>> there is this Java example:
>>
>> Map kafkaParams = new HashMap<>();
>> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
>> kafkaParams.put("key.deserializer", StringDeserializer.class);
>> kafkaParams.put("value.deserializer", StringDeserializer.class);
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>> kafkaParams.put("auto.offset.reset", "latest");
>> kafkaParams.put("enable.auto.commit", false);
>>
>> Collection topics = Arrays.asList("topicA", "topicB");
>>
>> JavaInputDStream> stream =
>>   KafkaUtils.createDirectStream(
>> streamingContext,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, kafkaParams)
>>   );
>>
>> I would like to configure Kafka from properties loaded from a Properties
>> file or a Map.
>>
>> Is there any API to take a Map and produce the required
>> Map required to set the Kafka parameters ? Such code would
>> convert "true" to a boolean, or a class name to the Class depending on the
>> key.
>>
>> Seems to me that I would need to know ALL possible Kafka parameters and
>> what data type they should be converted to in order to produce the
>> Map kafkaParams.
>>
>> The older API used a Map passed to the
>> KafkaUtils.createDirectStream
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Apache Hadoop and Spark

2018-01-24 Thread Mutahir Ali
Hello All,
Cordial Greetings,

I am trying to familiarize myself with Apache Hadoop and it's different 
software components and how they can be deployed on physical or virtual 
infrastructure.

I have a few questions:

Q1) Can we use Mapreduce and apache spark in the same cluster
Q2) is it mandatory to use GPUs for apache spark?
Q3) I read that apache spark is in-memory, will it benefit from SSD / Flash for 
caching or persistent storage?
Q4) If we intend to deploy a Hadoop cluster with 6 servers, can we have GPUs in 
only two and restrict apache spark on those servers only?
Q5) Possible to virtualize Spark with GPU Pass thru?
Q6) What GPUs are recommended / compatible with Apache Spark? (NVidia M10 / 
M60)?

Will be grateful for your suggestions and answers - please accept my apologies 
for totally noob questions 

Have a good day / evening ahead
Best



Sent from Outlook


Re: Providing Kafka configuration as Map of Strings

2018-01-24 Thread Tecno Brain
Basically, I am trying to avoid writing code like:

  switch( key ) {
case "key.deserializer" :  result.put(key ,
Class.forName(value)); break;
case "key.serializer"   :  result.put(key ,
Class.forName(value)); break;
case "value.deserializer" :  result.put(key ,
Class.forName(value)); break;
case "value.serializer"   :  result.put(key ,
Class.forName(value)); break;
case "max.partition.fetch.bytes" : result.put(key,
Long.valueOf(value)); break;
case "max.poll.interval.ms" : result.put(key,
Long.valueOf(value)); break;
case "enable.auto.commit" : result.put(key,
Boolean.valueOf(value)); break;
default:
result.put(key, value);
break;
}

since I would need to go over all possible Kafka properties that are not
expected as a String.

On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain 
wrote:

> On page https://spark.apache.org/docs/latest/streaming-kafka-0-
> 10-integration.html
> there is this Java example:
>
> Map kafkaParams = new 
> HashMap<>();kafkaParams.put("bootstrap.servers", 
> "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", 
> StringDeserializer.class);kafkaParams.put("value.deserializer", 
> StringDeserializer.class);kafkaParams.put("group.id", 
> "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset",
>  "latest");kafkaParams.put("enable.auto.commit", false);
> Collection topics = Arrays.asList("topicA", "topicB");
> JavaInputDStream> stream =
>   KafkaUtils.createDirectStream(
> streamingContext,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, kafkaParams)
>   );
>
> I would like to configure Kafka from properties loaded from a Properties
> file or a Map.
>
> Is there any API to take a Map and produce the required
> Map required to set the Kafka parameters ? Such code would
> convert "true" to a boolean, or a class name to the Class depending on the
> key.
>
> Seems to me that I would need to know ALL possible Kafka parameters and
> what data type they should be converted to in order to produce the
> Map kafkaParams.
>
> The older API used a Map passed to the
> KafkaUtils.createDirectStream
>
> Thanks
>
>
>
>
>
>
>


Providing Kafka configuration as Map of Strings

2018-01-24 Thread Tecno Brain
On page
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
there is this Java example:

Map kafkaParams = new
HashMap<>();kafkaParams.put("bootstrap.servers",
"localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer",
StringDeserializer.class);kafkaParams.put("value.deserializer",
StringDeserializer.class);kafkaParams.put("group.id",
"use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset",
"latest");kafkaParams.put("enable.auto.commit", false);
Collection topics = Arrays.asList("topicA", "topicB");
JavaInputDStream> stream =
  KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
  );

I would like to configure Kafka from properties loaded from a Properties
file or a Map.

Is there any API to take a Map and produce the required
Map required to set the Kafka parameters ? Such code would
convert "true" to a boolean, or a class name to the Class depending on the
key.

Seems to me that I would need to know ALL possible Kafka parameters and
what data type they should be converted to in order to produce the
Map kafkaParams.

The older API used a Map passed to the
KafkaUtils.createDirectStream

Thanks


Re: Spark Tuning Tool

2018-01-24 Thread Shmuel Blitz
Hi,

Which versions of Spark does the tool support?
Does the tool have any reference to the number of executor cores?

>From your blog post it seems that this is a new feature on your service.
Are you offering the tool for download?

Shmuel

On Wed, Jan 24, 2018 at 7:02 PM, Timothy Chen  wrote:

> Interested to try as well.
>
> Tim
>
> On Tue, Jan 23, 2018 at 5:54 PM, Raj Adyanthaya  wrote:
> > Its very interesting and I do agree that it will get a lot of traction
> once
> > made open source.
> >
> > On Mon, Jan 22, 2018 at 9:01 PM, Rohit Karlupia 
> wrote:
> >>
> >> Hi,
> >>
> >> I have been working on making the performance tuning of spark
> applications
> >> bit easier.  We have just released the beta version of the tool on
> Qubole.
> >>
> >> https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/
> >>
> >> This is not OSS yet but we would like to contribute it to OSS.  Fishing
> >> for some interest in the community if people find this work interesting
> and
> >> would like to try to it out.
> >>
> >> thanks,
> >> Rohit Karlupia
> >>
> >>
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Shmuel Blitz
Big Data Developer
Email: shmuel.bl...@similarweb.com
www.similarweb.com

 


Re: uncontinuous offset in kafka will cause the spark streamingfailure

2018-01-24 Thread Cody Koeninger
When you say the patch is not suitable, can you clarify why?

Probably best to get the various findings centralized on
https://issues.apache.org/jira/browse/SPARK-17147

Happy to help with getting the patch up to date and working.

On Wed, Jan 24, 2018 at 1:19 AM, namesuperwood  wrote:
> It seems this patch is not suitable for our problem。
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
>
> wood.super
>
>  原始邮件
> 发件人: namesuperwood
> 收件人: Justin Miller
> 抄送: user; Cody Koeninger
> 发送时间: 2018年1月24日(周三) 14:45
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> Yes. My spark streaming application works with uncompacted topic. I will
> check the patch.
>
>
> wood.super
>
>  原始邮件
> 发件人: Justin Miller
> 收件人: namesuperwood
> 抄送: user; Cody Koeninger
> 发送时间: 2018年1月24日(周三) 14:23
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> We appear to be kindred spirits, I’ve recently run into the same issue. Are
> you running compacted topics? I’ve run into this issue on non-compacted
> topics as well, it happens rarely but is still a pain. You might check out
> this patch and related spark streaming Kafka ticket:
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
> https://issues.apache.org/jira/browse/SPARK-17147
>
> I’ll be testing out the patch on somewhat large scale stream processor
> tomorrow.
>
> CCing: Cody Koeninger
>
> Best,
> Justin
>
> On Jan 23, 2018, at 10:48 PM, namesuperwood  wrote:
>
> Hi all
>
> kafka version :  kafka_2.11-0.11.0.2
>spark version :  2.0.1
>
> A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is
> 1255644602 and  latest offset is 1271253441.
>
> While starting a spark streaming to process the data from the topic ,  we
> got a exception with "Got wrong record   even after seeking to offset
> 1266921577”.  [   (earliest offset) 1255644602 < 1266921577   <
> 1271253441 ( latest offset ) ]
>
> Finally, I found the following source code in class CachedKafkaCounsumer
> from spark-streaming. This is obviously due to the fact that the offset from
> consumer poll and the offset which the comsuner seek is not equal.
>
>
> Here is the “ CachedKafkaCounsumer.scala” code:
>
> def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
>
> logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested
> $offset") if (offset != nextOffset) {
>
> logInfo(s"Initial fetch for $groupId $topic $partition $offset")
> seek(offset) poll(timeout) }
>
> if (!buffer.hasNext()) { poll(timeout) }
> assert(buffer.hasNext(),
>   s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
> var record = buffer.next()
>
> if (record.offset != offset) {
>   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
>   seek(offset)
>   poll(timeout)
>   assert(buffer.hasNext(),
> s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
>   record = buffer.next()
>   assert(record.offset == offset,
> s"Got wrong record for $groupId $topic $partition even after seeking to
> offset $offset")
> }
>
> nextOffset = offset + 1
> record
>
> }
>
> I reproduce this problem, and found out that offset from one
> topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs
> to be repaired.
>
> I  implemented a simple project to use consumer to  seek offset 1266921577.
> But it return the offset 1266921578. Then while  seek to 1266921576, it
> return the 1266921576 exactly。
>
>
>
>
>
> There is the code:
>
> public class consumerDemo {
>
> public static void main(String[] argv){
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "172.31.29.31:9091");
> props.put("group.id", "consumer-tutorial-demo");
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
> Collection collection = new ArrayList();
> collection.add(tp); consumer.assign(collection);
> consumer.seek(tp, 1266921576); ConsumerRecords
> consumerRecords = consumer.poll(1);
> List> listR = consumerRecords.records(tp);
> Iterator > iter = listR.iterator();
> ConsumerRecord record = iter.next();
> System.out.println(" the next record " + record.offset() + " recode topic "
> + record.topic());
>}
>
> }
>
>
>
>
>
>
>
>
>
>
> wood.super
>
>

-
To 

Re: Spark Tuning Tool

2018-01-24 Thread Timothy Chen
Interested to try as well.

Tim

On Tue, Jan 23, 2018 at 5:54 PM, Raj Adyanthaya  wrote:
> Its very interesting and I do agree that it will get a lot of traction once
> made open source.
>
> On Mon, Jan 22, 2018 at 9:01 PM, Rohit Karlupia  wrote:
>>
>> Hi,
>>
>> I have been working on making the performance tuning of spark applications
>> bit easier.  We have just released the beta version of the tool on Qubole.
>>
>> https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/
>>
>> This is not OSS yet but we would like to contribute it to OSS.  Fishing
>> for some interest in the community if people find this work interesting and
>> would like to try to it out.
>>
>> thanks,
>> Rohit Karlupia
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark.sql call takes far too long

2018-01-24 Thread lucas.g...@gmail.com
Hi Michael.

I haven't had this particular issue previously, but I have had other
performance issues.

Some questions which may help:

1. Have you checked the Spark Console?
2. Have you isolated the query in question, are you sure it's actually
where the slowdown occurs?
3. How much data are you talking about and how complex is the query?

Usually when debugging spark slowness issues it comes down to ineffective
data ingestion and / or a partition shuffle (and in some cases both).

That can all be seen from the console.

Good luck!

Gary Lucas

On 24 January 2018 at 04:16, Michael Shtelma  wrote:

> Hi all,
>
> I have a problem with the performance of the sparkSession.sql call. It
> takes up to a couple of seconds for me right now. I have a lot of
> generated temporary tables, which are registered within the session
> and also a lot of temporary data frames. Is it possible, that the
> analysis/resolve/analysis phases take far too long? Is there a way to
> figure out, what exactly takes too long?
>
> Does anybody have any ideas on this?
> Any assistance would be greatly appreciated!
>
> Thanks,
> Michael
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


spark.sql call takes far too long

2018-01-24 Thread Michael Shtelma
Hi all,

I have a problem with the performance of the sparkSession.sql call. It
takes up to a couple of seconds for me right now. I have a lot of
generated temporary tables, which are registered within the session
and also a lot of temporary data frames. Is it possible, that the
analysis/resolve/analysis phases take far too long? Is there a way to
figure out, what exactly takes too long?

Does anybody have any ideas on this?
Any assistance would be greatly appreciated!

Thanks,
Michael

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org