Re: Structured Streaming : Custom Source and Sink Development and PySpark.

2018-08-30 Thread Russell Spitzer
Yes, Scala or Java.

No. Once you have written the implementation it is valid in all df apis.

As for examples there are many, check the Kafka source in tree or one of
the many sources listed on the spark packages website.

On Thu, Aug 30, 2018, 8:23 PM Ramaswamy, Muthuraman <
muthuraman.ramasw...@viasat.com> wrote:

> I would like to develop Custom Source and Sink. So, I have a couple of
> questions:
>
>
>
>1. Do I have to use Scala or Java to develop these Custom Source/Sink?
>
>
>
>1. Also, once the source/sink has been developed, to use in
>PySpark/Python, do I have to develop any Py4J modules? Any pointers or good
>documentation or GitHub Source as a reference will be of great help.
>
>
>
> Please advise.
>
>
>
> Thank you,
>
>
>
>
>
>
>


Structured Streaming : Custom Source and Sink Development and PySpark.

2018-08-30 Thread Ramaswamy, Muthuraman
I would like to develop Custom Source and Sink. So, I have a couple of 
questions:


  1.  Do I have to use Scala or Java to develop these Custom Source/Sink?


  1.  Also, once the source/sink has been developed, to use in PySpark/Python, 
do I have to develop any Py4J modules? Any pointers or good documentation or 
GitHub Source as a reference will be of great help.

Please advise.

Thank you,





Re: CSV parser - how to parse column containing json data

2018-08-30 Thread Brandon Geise
If you know your json schema you can create a struct and then apply that using 
from_json:

 

val json_schema = StructType(Array(StructField(“x”, StringType, true), 
StructField(“y”, StringType, true), StructField(“z”, IntegerType, true)))

 

.withColumn("_c3", from_json(col("_c3_signals"),json_schema))

 

From: Nirav Patel 
Date: Thursday, August 30, 2018 at 7:19 PM
To: spark users 
Subject: CSV parser - how to parse column containing json data

 

Is there a way to parse csv file with some column in middle containing json 
data structure?

 

"a",102,"c","{"x":"xx","y":false,"z":123}","d","e",102.2

 

 

Thanks,

Nirav










CSV parser - how to parse column containing json data

2018-08-30 Thread Nirav Patel
Is there a way to parse csv file with some column in middle containing json
data structure?

"a",102,"c","{"x":"xx","y":false,"z":123}","d","e",102.2


Thanks,
Nirav

-- 


 

 
   
   
      



Local mode vs client mode with one executor

2018-08-30 Thread Guillermo Ortiz
I have many spark processes, some of them are pretty simple and they don't
have to process almost messages but they were developed with the same
archeotype and they use spark.

Some of them are executed with many executors but a few ones don't make
sense to process with more than 2-4 cores in only one executor. The most
important reason is that the quantity of messages is so low,, that it's not
worth it.

The point here it's,, any disventage if I run this few spark processes in
local[2..4] instance of cluster/client mode with one executor (4 cores) and
one driver?. I have read that it's a testing mode in most of cases and I
use for my tests too ;)
Besides, it seems that it goes faster running in local mode in those cases.


Re: spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
I solved this issue by using spark 2.3.1 jars copied from the HDP3.0
cluster. Thanks.

On Thu, Aug 30, 2018 at 10:18 AM Lian Jiang  wrote:

> Per https://spark.apache.org/docs/latest/building-spark.html, spark 2.3.1
> is built with hadoop 2.6.X by default. This is why I see my fat jar
> includes hadoop 2.6.5 (instead of 3.1.0) jars. HftpFileSystem has been
> removed in hadoop 3.
>
> On https://spark.apache.org/downloads.html, I only see spark 2.3.1 built
> with hadoop 2.7. Where can I get spark 2.3.1 built with hadoop 3? Does
> spark 2.3.1 support hadoop 3?
>
> Appreciate your help.
>
> On Thu, Aug 30, 2018 at 8:59 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
>> streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
>> HDP3:
>>
>> java.lang.IllegalAccessError: class
>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>>
>> at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)
>>
>> at
>> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.(HadoopFileLinesReader.scala:46)
>>
>> at
>> org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:125)
>>
>> at
>> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:132)
>>
>> at
>> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:130)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
>>
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org
>> $apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>>
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>>
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>>
>> at
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
>>
>> at
>> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
>>
>> at
>> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>>
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>
>> at 

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-30 Thread Cody Koeninger
You're using an older version of spark, with what looks like a
manually included different version of the kafka-clients jar (1.0)
than what that version of the spark connector was written to depend on
(0.10.0.1), so there's no telling what's going on.

On Wed, Aug 29, 2018 at 3:40 PM, Guillermo Ortiz Fernández
 wrote:
> I can't... do you think that it's a possible bug of this version?? from
> Spark or Kafka?
>
> El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
> escribió:
>>
>> Are you able to try a recent version of spark?
>>
>> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>>  wrote:
>> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
>> > exception and Spark dies.
>> >
>> > I couldn't see any error or problem among the machines, anybody has the
>> > reason about this error?
>> >
>> >
>> > java.lang.IllegalStateException: This consumer has already been closed.
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.Option.orElse(Option.scala:289)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > 

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-30 Thread Cody Koeninger
I doubt that fix will get backported to 2.3.x

Are you able to test against master?  2.4 with the fix you linked to
is likely to hit code freeze soon.

>From a quick look at your code, I'm not sure why you're mapping over
an array of brokers.  It seems like that would result in different
streams with the same group id, because broker isn't part of your
group id string.

On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey  wrote:
> Hello, Spark Users.
>
> We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> have a Spark streaming job, and we're reading a reasonable amount of data
> from Kafka (40 GB / minute or so).  We would like to move to using the Kafka
> 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to
> modify formats.
>
> We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried to
> work around it as follows:
>
> 1. Disabled consumer caching.  This increased the total job time from ~1
> minute per batch to ~1.8 minutes per batch.  This performance penalty is
> unacceptable for our use-case. We also saw some partitions stop receiving
> for an extended period of time - I was unable to get a simple repro for this
> effect though.
> 2. Disabled speculation and multiple-job concurrency and added caching for
> the stream directly after reading from Kafka & caching offsets.  This
> approach seems to work well for simple examples (read from a Kafka topic,
> write to another topic). However, when we move through more complex logic we
> continue to see this type of error - despite only creating the stream for a
> given topic a single time.  We validated that we're creating the stream from
> a given topic / partition a single time by logging on stream creation,
> caching the stream and (eventually) calling 'runJob' to actually go and
> fetch the data. Nonetheless with multiple outputs we see the
> ConcurrentModificationException.
>
> I've included some code down below.  I would be happy if anyone had
> debugging tips for the workaround.  However, my main concern is to ensure
> that the 2.4 version will have a bug fix that will work for Spark Streaming
> in which multiple input topics map data to multiple outputs. I would also
> like to understand if the fix (https://github.com/apache/spark/pull/20997)
> will be backported to Spark 2.3.x
>
> In our code, read looks like the following:
>
> case class StreamLookupKey(topic: Set[String], brokers: String)
>
> private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
>
> // Given inputs return a direct stream.
> def createDirectStream(ssc: StreamingContext,
>additionalKafkaParameters: Map[String, String],
>brokersToUse: Array[String], //
> broker1,broker2|broker3,broker4
>topicsToUse: Array[String],
>applicationName: String,
>persist: Option[PersistenceManager],
>useOldestOffsets: Boolean,
>maxRatePerPartition: Long,
>batchSeconds: Int
>   ): DStream[DecodedData] = {
>   val streams: Array[DStream[DecodedData]] =
> brokersToUse.map(brokers => {
>   val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>   val kafkaParameters: Map[String, String] = getKafkaParameters(brokers,
> useOldestOffsets, groupId) ++ additionalKafkaParameters
>   logger.info(s"Kafka Params: ${kafkaParameters}")
>   val topics = topicsToUse.toSet
>   logger.info(s"Creating Kafka direct connection -
> ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> applicationGroup: ${groupId}")
>
>   streamMap.getOrElse(StreamLookupKey(topics, brokers),
> createKafkaStream(ssc, applicationName, topics, brokers,
> maxRatePerPartition, batchSeconds, kafkaParameters))
> })
>
>   ssc.union(streams)
> }
>
> private def createKafkaStream(ssc: StreamingContext, applicationName:
> String, topics: Set[String], brokers: String,
>   maxRatePerPartition: Long, batchSeconds: Int,
> kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>   logger.info(s"Creating a stream from Kafka for application
> ${applicationName} w/ topic ${topics} and " +
> s"brokers: ${brokers.split(',').head} with parameters:
> ${kafkaParameters.mkString("|")}")
>   try {
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>   KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>
> KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream,
> maxRatePerPartition, batchSeconds)
> val result =
> 

ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-30 Thread Bryan Jeffrey
Hello, Spark Users.

We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
have a Spark streaming job, and we're reading a reasonable amount of data
from Kafka (40 GB / minute or so).  We would like to move to using the
Kafka 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from
having to modify formats.

We've run into https://issues.apache.org/jira/browse/SPARK-19185,
'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried
to work around it as follows:

1. Disabled consumer caching.  This increased the total job time from ~1
minute per batch to ~1.8 minutes per batch.  This performance penalty is
unacceptable for our use-case. We also saw some partitions stop receiving
for an extended period of time - I was unable to get a simple repro for
this effect though.
2. Disabled speculation and multiple-job concurrency and added caching for
the stream directly after reading from Kafka & caching offsets.  This
approach seems to work well for simple examples (read from a Kafka topic,
write to another topic). However, when we move through more complex logic
we continue to see this type of error - despite only creating the stream
for a given topic a single time.  We validated that we're creating the
stream from a given topic / partition a single time by logging on stream
creation, caching the stream and (eventually) calling 'runJob' to actually
go and fetch the data. Nonetheless with multiple outputs we see the
ConcurrentModificationException.

I've included some code down below.  I would be happy if anyone had
debugging tips for the workaround.  However, my main concern is to ensure
that the 2.4 version will have a bug fix that will work for Spark Streaming
in which multiple input topics map data to multiple outputs. I would also
like to understand if the fix (https://github.com/apache/spark/pull/20997)
will be backported to Spark 2.3.x

In our code, read looks like the following:

case class StreamLookupKey(topic: Set[String], brokers: String)

private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()

// Given inputs return a direct stream.
def createDirectStream(ssc: StreamingContext,
   additionalKafkaParameters: Map[String, String],
   brokersToUse: Array[String], //
broker1,broker2|broker3,broker4
   topicsToUse: Array[String],
   applicationName: String,
   persist: Option[PersistenceManager],
   useOldestOffsets: Boolean,
   maxRatePerPartition: Long,
   batchSeconds: Int
  ): DStream[DecodedData] = {
  val streams: Array[DStream[DecodedData]] =
brokersToUse.map(brokers => {
  val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
  val kafkaParameters: Map[String, String] =
getKafkaParameters(brokers, useOldestOffsets, groupId) ++
additionalKafkaParameters
  logger.info(s"Kafka Params: ${kafkaParameters}")
  val topics = topicsToUse.toSet
  logger.info(s"Creating Kafka direct connection -
${kafkaParameters.mkString(GeneralConstants.comma)} " +
s"topics: ${topics.mkString(GeneralConstants.comma)} w/
applicationGroup: ${groupId}")

  streamMap.getOrElse(StreamLookupKey(topics, brokers),
createKafkaStream(ssc, applicationName, topics, brokers,
maxRatePerPartition, batchSeconds, kafkaParameters))
})

  ssc.union(streams)
}

private def createKafkaStream(ssc: StreamingContext, applicationName:
String, topics: Set[String], brokers: String,
  maxRatePerPartition: Long, batchSeconds:
Int, kafkaParameters: Map[String,String]): DStream[DecodedData] = {
  logger.info(s"Creating a stream from Kafka for application
${applicationName} w/ topic ${topics} and " +
s"brokers: ${brokers.split(',').head} with parameters:
${kafkaParameters.mkString("|")}")
  try {
val consumerStrategy = ConsumerStrategies.Subscribe[String,
DecodedData](topics.toSeq, kafkaParameters)
val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
  KafkaUtils.createDirectStream(ssc, locationStrategy =
LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)

KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
stream, maxRatePerPartition, batchSeconds)
val result =
stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
streamMap += StreamLookupKey(topics, brokers) -> result
result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
Iterator[_]) => {}))
result
  } catch ErrorHandling.safelyCatch {
case e: Exception =>
  logger.error("Unable to create direct stream:")
  e.printStackTrace()
  throw KafkaDirectStreamException(topics.toArray, brokers, e)
  }
}

def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
applicationName: String): Map[String, String] =
  Map[String, String](
"auto.offset.reset" -> 

Re: Default Java Opts Standalone

2018-08-30 Thread Sonal Goyal
Hi Eevee,

For the executor, have you tried

a. Passing --conf "spark.executor.extraJavaOptions=-XX" as part of the
spark-submit command line if you want it application specific OR
b. Setting spark.executor.extraJavaOptions in conf/spark-default.conf for
all jobs.


Thanks,
Sonal
Nube Technologies 





On Thu, Aug 30, 2018 at 5:12 PM, Evelyn Bayes  wrote:

> Hey all,
>
> Stuck trying to set a parameter in the spark-env.sh and I’m hoping someone
> here knows how.
>
> I want to set the JVM setting -XX:+ExitOnOutOfMemoryError for both Spark
> executors and Spark workers in a standalone mode.
>
> So far my best guess so far is:
> *Worker*
> SPARK_WORKER_OPTS=“${SPARK_WORKER_OPTS} -Dspark.worker.
> extraJavaOptions=-XX:+ExitOnOutOfMemoryError”
> *Executor*
> SPARK_DAEMON_JAVA_OPTS=“${SPARK_DAEMON_JAVA_OPTS} -Dspark.executor.
> extraJavaOptions=-XX:+ExitOnOutOfMemoryError”
>
> Anyone know the actual way to set this or a good place to learn about how
> this stuff works? I’ve already seen the Spark conf and standalone
> documentation and it doesn’t really make this stuff clear.
>
> Thanks a bunch,
> Eevee.
>


Re: spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
Per https://spark.apache.org/docs/latest/building-spark.html, spark 2.3.1
is built with hadoop 2.6.X by default. This is why I see my fat jar
includes hadoop 2.6.5 (instead of 3.1.0) jars. HftpFileSystem has been
removed in hadoop 3.

On https://spark.apache.org/downloads.html, I only see spark 2.3.1 built
with hadoop 2.7. Where can I get spark 2.3.1 built with hadoop 3? Does
spark 2.3.1 support hadoop 3?

Appreciate your help.

On Thu, Aug 30, 2018 at 8:59 AM Lian Jiang  wrote:

> Hi,
>
> I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
> streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
> HDP3:
>
> java.lang.IllegalAccessError: class
> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>
> at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)
>
> at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.(HadoopFileLinesReader.scala:46)
>
> at
> org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:125)
>
> at
> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:132)
>
> at
> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:130)
>
> at
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>
> at
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
>
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org
> $apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
>
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
>
> at
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
>
> at
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> 

spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
Hi,

I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
HDP3:

java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)

at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

at java.net.URLClassLoader$1.run(URLClassLoader.java:368)

at java.net.URLClassLoader$1.run(URLClassLoader.java:362)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:361)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)

at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)

at java.util.ServiceLoader$1.next(ServiceLoader.java:480)

at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)

at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)

at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.(HadoopFileLinesReader.scala:46)

at
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:125)

at
org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:132)

at
org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:130)

at
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)

at
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)

at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org
$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)

at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)

at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)

at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)

at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)

at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)



Any idea? Thanks.


Unsubscribe

2018-08-30 Thread Michael Styles



Default Java Opts Standalone

2018-08-30 Thread Evelyn Bayes
Hey all,

Stuck trying to set a parameter in the spark-env.sh and I’m hoping someone here 
knows how.

I want to set the JVM setting -XX:+ExitOnOutOfMemoryError for both Spark 
executors and Spark workers in a standalone mode.

So far my best guess so far is:
Worker
SPARK_WORKER_OPTS=“${SPARK_WORKER_OPTS} 
-Dspark.worker.extraJavaOptions=-XX:+ExitOnOutOfMemoryError”
Executor
SPARK_DAEMON_JAVA_OPTS=“${SPARK_DAEMON_JAVA_OPTS} 
-Dspark.executor.extraJavaOptions=-XX:+ExitOnOutOfMemoryError”

Anyone know the actual way to set this or a good place to learn about how this 
stuff works? I’ve already seen the Spark conf and standalone documentation and 
it doesn’t really make this stuff clear.

Thanks a bunch,
Eevee.

Spark Structured Streaming checkpointing with S3 data source

2018-08-30 Thread sherif98
I have data that is continuously pushed to multiple S3 buckets. I want to set
up a structured streaming application that uses the S3 buckets as the data
source and do stream-stream joins.

My question is if the application is down for some reason, will restarting
the application would continue processing data from the S3 where it left
off?

So for example, if I have 5 JSON files with 100 records in each file. And
spark failed while processing the tenth record in the 3rd file. When the
query runs again will it begin processing from the tenth record in the 3rd
file?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org