Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Bryan Jeffrey
Cody,

Yes - I was able to verify that I am not seeing duplicate calls to
createDirectStream.  If the spark-streaming-kafka-0-10 will work on a 2.3
cluster I can go ahead and give that a shot.

Regards,

Bryan Jeffrey

On Fri, Aug 31, 2018 at 11:56 AM Cody Koeninger  wrote:

> Just to be 100% sure, when you're logging the group id in
> createDirectStream, you no longer see any duplicates?
>
> Regarding testing master, is the blocker that your spark cluster is on
> 2.3?  There's at least a reasonable chance that building an
> application assembly jar that uses the master version just for the
> spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster
>
> On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey 
> wrote:
> > Cody,
> >
> > We are connecting to multiple clusters for each topic.  I did experiment
> > this morning with both adding a cluster identifier to the group id, as
> well
> > as simply moving to use only a single one of our clusters.  Neither of
> these
> > were successful.  I am not able to run a test against master now.
> >
> > Regards,
> >
> > Bryan Jeffrey
> >
> >
> >
> >
> > On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger 
> wrote:
> >>
> >> I doubt that fix will get backported to 2.3.x
> >>
> >> Are you able to test against master?  2.4 with the fix you linked to
> >> is likely to hit code freeze soon.
> >>
> >> From a quick look at your code, I'm not sure why you're mapping over
> >> an array of brokers.  It seems like that would result in different
> >> streams with the same group id, because broker isn't part of your
> >> group id string.
> >>
> >> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <
> bryan.jeff...@gmail.com>
> >> wrote:
> >> > Hello, Spark Users.
> >> >
> >> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
> >> > We're
> >> > have a Spark streaming job, and we're reading a reasonable amount of
> >> > data
> >> > from Kafka (40 GB / minute or so).  We would like to move to using the
> >> > Kafka
> >> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from
> having
> >> > to
> >> > modify formats.
> >> >
> >> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> >> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> >> > tried to
> >> > work around it as follows:
> >> >
> >> > 1. Disabled consumer caching.  This increased the total job time from
> ~1
> >> > minute per batch to ~1.8 minutes per batch.  This performance penalty
> is
> >> > unacceptable for our use-case. We also saw some partitions stop
> >> > receiving
> >> > for an extended period of time - I was unable to get a simple repro
> for
> >> > this
> >> > effect though.
> >> > 2. Disabled speculation and multiple-job concurrency and added caching
> >> > for
> >> > the stream directly after reading from Kafka & caching offsets.  This
> >> > approach seems to work well for simple examples (read from a Kafka
> >> > topic,
> >> > write to another topic). However, when we move through more complex
> >> > logic we
> >> > continue to see this type of error - despite only creating the stream
> >> > for a
> >> > given topic a single time.  We validated that we're creating the
> stream
> >> > from
> >> > a given topic / partition a single time by logging on stream creation,
> >> > caching the stream and (eventually) calling 'runJob' to actually go
> and
> >> > fetch the data. Nonetheless with multiple outputs we see the
> >> > ConcurrentModificationException.
> >> >
> >> > I've included some code down below.  I would be happy if anyone had
> >> > debugging tips for the workaround.  However, my main concern is to
> >> > ensure
> >> > that the 2.4 version will have a bug fix that will work for Spark
> >> > Streaming
> >> > in which multiple input topics map data to multiple outputs. I would
> >> > also
> >> > like to understand if the fix
> >> > (https://github.com/apache/spark/pull/20997)
> >> > will be backported to Spark 2.3.x
> >> >
> >> > In our code, read looks like the following:
> >> >
> >> > case class StreamLookupKey(topic: Set[String], brokers: String)
> >> >
> >> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
> >> > Map()
> >> >
> >> > // Given inputs return a direct stream.
> >> > def createDirectStream(ssc: StreamingContext,
> >> >additionalKafkaParameters: Map[String, String],
> >> >brokersToUse: Array[String], //
> >> > broker1,broker2|broker3,broker4
> >> >topicsToUse: Array[String],
> >> >applicationName: String,
> >> >persist: Option[PersistenceManager],
> >> >useOldestOffsets: Boolean,
> >> >maxRatePerPartition: Long,
> >> >batchSeconds: Int
> >> >   ): DStream[DecodedData] = {
> >> >   val streams: Array[DStream[DecodedData]] =
> >> > brokersToUse.map(brokers => {
> >> >   val groupId = 

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Cody Koeninger
Just to be 100% sure, when you're logging the group id in
createDirectStream, you no longer see any duplicates?

Regarding testing master, is the blocker that your spark cluster is on
2.3?  There's at least a reasonable chance that building an
application assembly jar that uses the master version just for the
spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster

On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey  wrote:
> Cody,
>
> We are connecting to multiple clusters for each topic.  I did experiment
> this morning with both adding a cluster identifier to the group id, as well
> as simply moving to use only a single one of our clusters.  Neither of these
> were successful.  I am not able to run a test against master now.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
> On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger  wrote:
>>
>> I doubt that fix will get backported to 2.3.x
>>
>> Are you able to test against master?  2.4 with the fix you linked to
>> is likely to hit code freeze soon.
>>
>> From a quick look at your code, I'm not sure why you're mapping over
>> an array of brokers.  It seems like that would result in different
>> streams with the same group id, because broker isn't part of your
>> group id string.
>>
>> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey 
>> wrote:
>> > Hello, Spark Users.
>> >
>> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
>> > We're
>> > have a Spark streaming job, and we're reading a reasonable amount of
>> > data
>> > from Kafka (40 GB / minute or so).  We would like to move to using the
>> > Kafka
>> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
>> > to
>> > modify formats.
>> >
>> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
>> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
>> > tried to
>> > work around it as follows:
>> >
>> > 1. Disabled consumer caching.  This increased the total job time from ~1
>> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
>> > unacceptable for our use-case. We also saw some partitions stop
>> > receiving
>> > for an extended period of time - I was unable to get a simple repro for
>> > this
>> > effect though.
>> > 2. Disabled speculation and multiple-job concurrency and added caching
>> > for
>> > the stream directly after reading from Kafka & caching offsets.  This
>> > approach seems to work well for simple examples (read from a Kafka
>> > topic,
>> > write to another topic). However, when we move through more complex
>> > logic we
>> > continue to see this type of error - despite only creating the stream
>> > for a
>> > given topic a single time.  We validated that we're creating the stream
>> > from
>> > a given topic / partition a single time by logging on stream creation,
>> > caching the stream and (eventually) calling 'runJob' to actually go and
>> > fetch the data. Nonetheless with multiple outputs we see the
>> > ConcurrentModificationException.
>> >
>> > I've included some code down below.  I would be happy if anyone had
>> > debugging tips for the workaround.  However, my main concern is to
>> > ensure
>> > that the 2.4 version will have a bug fix that will work for Spark
>> > Streaming
>> > in which multiple input topics map data to multiple outputs. I would
>> > also
>> > like to understand if the fix
>> > (https://github.com/apache/spark/pull/20997)
>> > will be backported to Spark 2.3.x
>> >
>> > In our code, read looks like the following:
>> >
>> > case class StreamLookupKey(topic: Set[String], brokers: String)
>> >
>> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
>> > Map()
>> >
>> > // Given inputs return a direct stream.
>> > def createDirectStream(ssc: StreamingContext,
>> >additionalKafkaParameters: Map[String, String],
>> >brokersToUse: Array[String], //
>> > broker1,broker2|broker3,broker4
>> >topicsToUse: Array[String],
>> >applicationName: String,
>> >persist: Option[PersistenceManager],
>> >useOldestOffsets: Boolean,
>> >maxRatePerPartition: Long,
>> >batchSeconds: Int
>> >   ): DStream[DecodedData] = {
>> >   val streams: Array[DStream[DecodedData]] =
>> > brokersToUse.map(brokers => {
>> >   val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>> >   val kafkaParameters: Map[String, String] =
>> > getKafkaParameters(brokers,
>> > useOldestOffsets, groupId) ++ additionalKafkaParameters
>> >   logger.info(s"Kafka Params: ${kafkaParameters}")
>> >   val topics = topicsToUse.toSet
>> >   logger.info(s"Creating Kafka direct connection -
>> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>> > s"topics: ${topics.mkString(GeneralConstants.comma)} w/
>> > applicationGroup: ${groupId}")
>> >
>> >   

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Bryan Jeffrey
Cody,

We are connecting to multiple clusters for each topic.  I did experiment
this morning with both adding a cluster identifier to the group id, as well
as simply moving to use only a single one of our clusters.  Neither of
these were successful.  I am not able to run a test against master now.

Regards,

Bryan Jeffrey




On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger  wrote:

> I doubt that fix will get backported to 2.3.x
>
> Are you able to test against master?  2.4 with the fix you linked to
> is likely to hit code freeze soon.
>
> From a quick look at your code, I'm not sure why you're mapping over
> an array of brokers.  It seems like that would result in different
> streams with the same group id, because broker isn't part of your
> group id string.
>
> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey 
> wrote:
> > Hello, Spark Users.
> >
> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> > have a Spark streaming job, and we're reading a reasonable amount of data
> > from Kafka (40 GB / minute or so).  We would like to move to using the
> Kafka
> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
> to
> > modify formats.
> >
> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> tried to
> > work around it as follows:
> >
> > 1. Disabled consumer caching.  This increased the total job time from ~1
> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
> > unacceptable for our use-case. We also saw some partitions stop receiving
> > for an extended period of time - I was unable to get a simple repro for
> this
> > effect though.
> > 2. Disabled speculation and multiple-job concurrency and added caching
> for
> > the stream directly after reading from Kafka & caching offsets.  This
> > approach seems to work well for simple examples (read from a Kafka topic,
> > write to another topic). However, when we move through more complex
> logic we
> > continue to see this type of error - despite only creating the stream
> for a
> > given topic a single time.  We validated that we're creating the stream
> from
> > a given topic / partition a single time by logging on stream creation,
> > caching the stream and (eventually) calling 'runJob' to actually go and
> > fetch the data. Nonetheless with multiple outputs we see the
> > ConcurrentModificationException.
> >
> > I've included some code down below.  I would be happy if anyone had
> > debugging tips for the workaround.  However, my main concern is to ensure
> > that the 2.4 version will have a bug fix that will work for Spark
> Streaming
> > in which multiple input topics map data to multiple outputs. I would also
> > like to understand if the fix (
> https://github.com/apache/spark/pull/20997)
> > will be backported to Spark 2.3.x
> >
> > In our code, read looks like the following:
> >
> > case class StreamLookupKey(topic: Set[String], brokers: String)
> >
> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
> >
> > // Given inputs return a direct stream.
> > def createDirectStream(ssc: StreamingContext,
> >additionalKafkaParameters: Map[String, String],
> >brokersToUse: Array[String], //
> > broker1,broker2|broker3,broker4
> >topicsToUse: Array[String],
> >applicationName: String,
> >persist: Option[PersistenceManager],
> >useOldestOffsets: Boolean,
> >maxRatePerPartition: Long,
> >batchSeconds: Int
> >   ): DStream[DecodedData] = {
> >   val streams: Array[DStream[DecodedData]] =
> > brokersToUse.map(brokers => {
> >   val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
> >   val kafkaParameters: Map[String, String] =
> getKafkaParameters(brokers,
> > useOldestOffsets, groupId) ++ additionalKafkaParameters
> >   logger.info(s"Kafka Params: ${kafkaParameters}")
> >   val topics = topicsToUse.toSet
> >   logger.info(s"Creating Kafka direct connection -
> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> > s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> > applicationGroup: ${groupId}")
> >
> >   streamMap.getOrElse(StreamLookupKey(topics, brokers),
> > createKafkaStream(ssc, applicationName, topics, brokers,
> > maxRatePerPartition, batchSeconds, kafkaParameters))
> > })
> >
> >   ssc.union(streams)
> > }
> >
> > private def createKafkaStream(ssc: StreamingContext, applicationName:
> > String, topics: Set[String], brokers: String,
> >   maxRatePerPartition: Long, batchSeconds:
> Int,
> > kafkaParameters: Map[String,String]): DStream[DecodedData] = {
> >   logger.info(s"Creating a stream from Kafka for application
> > ${applicationName} w/ topic ${topics} and " +

read snappy compressed files in spark

2018-08-31 Thread Ricky
I wanna be able to read snappy compressed files in spark. I can do a
val df = spark.read.textFile("hdfs:// path")
and it passes that test in spark shell but beyond that when i do a
df.show(10,false) or something - it shows me binary data mixed with real
text - how do I read the decompressed file in spark - I can build a
dataframe reader if someone guides or nudges me in right direction ...


Type change support in spark parquet read-write

2018-08-31 Thread Swapnil Chougule
Hi Folks,

I came across one problem while reading parquet through spark.
One parquet has been written with field 'a' with type 'Integer'.
Afterwards, reading this file with schema for 'a' as 'Long' gives exception.
I thought this compatible type change is supported. But this is not working.
Code snippet of this:

val oldSchema =
  StructType(
StructField("a", IntegerType, true) :: Nil)

val df1 = spark.read.schema(oldSchema).json("/path/to/json/data")
df1.write.parquet("/path/to/parquet/data")

val newSchema =
  StructType(
StructField("a", LongType, true) :: Nil)

spark.read.schema(newSchema).json("/path/to/parquet/data").show()


Any help around this is really appreciated.

Thanks,
Swapnil


is spark TempView thread safe

2018-08-31 Thread 崔苗
Hi,
we know  multiple parallel jobs can run simultaneously if they were submitted 
from separate threads , now we want to reduce time cost by multiple threads in 
one application (sparkSession) , I want to know is tempVIew thread safe , how 
about to create same tempView in one sparkSession?