Just to be 100% sure, when you're logging the group id in
createDirectStream, you no longer see any duplicates?

Regarding testing master, is the blocker that your spark cluster is on
2.3?  There's at least a reasonable chance that building an
application assembly jar that uses the master version just for the
spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster

On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> Cody,
>
> We are connecting to multiple clusters for each topic.  I did experiment
> this morning with both adding a cluster identifier to the group id, as well
> as simply moving to use only a single one of our clusters.  Neither of these
> were successful.  I am not able to run a test against master now.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
> On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I doubt that fix will get backported to 2.3.x
>>
>> Are you able to test against master?  2.4 with the fix you linked to
>> is likely to hit code freeze soon.
>>
>> From a quick look at your code, I'm not sure why you're mapping over
>> an array of brokers.  It seems like that would result in different
>> streams with the same group id, because broker isn't part of your
>> group id string.
>>
>> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>> > Hello, Spark Users.
>> >
>> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
>> > We're
>> > have a Spark streaming job, and we're reading a reasonable amount of
>> > data
>> > from Kafka (40 GB / minute or so).  We would like to move to using the
>> > Kafka
>> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
>> > to
>> > modify formats.
>> >
>> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
>> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
>> > tried to
>> > work around it as follows:
>> >
>> > 1. Disabled consumer caching.  This increased the total job time from ~1
>> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
>> > unacceptable for our use-case. We also saw some partitions stop
>> > receiving
>> > for an extended period of time - I was unable to get a simple repro for
>> > this
>> > effect though.
>> > 2. Disabled speculation and multiple-job concurrency and added caching
>> > for
>> > the stream directly after reading from Kafka & caching offsets.  This
>> > approach seems to work well for simple examples (read from a Kafka
>> > topic,
>> > write to another topic). However, when we move through more complex
>> > logic we
>> > continue to see this type of error - despite only creating the stream
>> > for a
>> > given topic a single time.  We validated that we're creating the stream
>> > from
>> > a given topic / partition a single time by logging on stream creation,
>> > caching the stream and (eventually) calling 'runJob' to actually go and
>> > fetch the data. Nonetheless with multiple outputs we see the
>> > ConcurrentModificationException.
>> >
>> > I've included some code down below.  I would be happy if anyone had
>> > debugging tips for the workaround.  However, my main concern is to
>> > ensure
>> > that the 2.4 version will have a bug fix that will work for Spark
>> > Streaming
>> > in which multiple input topics map data to multiple outputs. I would
>> > also
>> > like to understand if the fix
>> > (https://github.com/apache/spark/pull/20997)
>> > will be backported to Spark 2.3.x
>> >
>> > In our code, read looks like the following:
>> >
>> > case class StreamLookupKey(topic: Set[String], brokers: String)
>> >
>> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
>> > Map()
>> >
>> > // Given inputs return a direct stream.
>> > def createDirectStream(ssc: StreamingContext,
>> >                        additionalKafkaParameters: Map[String, String],
>> >                        brokersToUse: Array[String], //
>> > broker1,broker2|broker3,broker4
>> >                        topicsToUse: Array[String],
>> >                        applicationName: String,
>> >                        persist: Option[PersistenceManager],
>> >                        useOldestOffsets: Boolean,
>> >                        maxRatePerPartition: Long,
>> >                        batchSeconds: Int
>> >                       ): DStream[DecodedData] = {
>> >   val streams: Array[DStream[DecodedData]] =
>> >     brokersToUse.map(brokers => {
>> >       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>> >       val kafkaParameters: Map[String, String] =
>> > getKafkaParameters(brokers,
>> > useOldestOffsets, groupId) ++ additionalKafkaParameters
>> >       logger.info(s"Kafka Params: ${kafkaParameters}")
>> >       val topics = topicsToUse.toSet
>> >       logger.info(s"Creating Kafka direct connection -
>> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>> >         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
>> > applicationGroup: ${groupId}")
>> >
>> >       streamMap.getOrElse(StreamLookupKey(topics, brokers),
>> > createKafkaStream(ssc, applicationName, topics, brokers,
>> > maxRatePerPartition, batchSeconds, kafkaParameters))
>> >     })
>> >
>> >   ssc.union(streams)
>> > }
>> >
>> > private def createKafkaStream(ssc: StreamingContext, applicationName:
>> > String, topics: Set[String], brokers: String,
>> >                               maxRatePerPartition: Long, batchSeconds:
>> > Int,
>> > kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>> >   logger.info(s"Creating a stream from Kafka for application
>> > ${applicationName} w/ topic ${topics} and " +
>> >     s"brokers: ${brokers.split(',').head} with parameters:
>> > ${kafkaParameters.mkString("|")}")
>> >   try {
>> >     val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> > DecodedData](topics.toSeq, kafkaParameters)
>> >     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>> >       KafkaUtils.createDirectStream(ssc, locationStrategy =
>> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>> >
>> >     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
>> > stream,
>> > maxRatePerPartition, batchSeconds)
>> >     val result =
>> >
>> > stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
>> >     streamMap += StreamLookupKey(topics, brokers) -> result
>> >     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
>> > Iterator[_])
>> > => {}))
>> >     result
>> >   } catch ErrorHandling.safelyCatch {
>> >     case e: Exception =>
>> >       logger.error("Unable to create direct stream:")
>> >       e.printStackTrace()
>> >       throw KafkaDirectStreamException(topics.toArray, brokers, e)
>> >   }
>> > }
>> >
>> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
>> > applicationName: String): Map[String, String] =
>> >   Map[String, String](
>> >     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else
>> > "latest"),
>> >     "enable.auto.commit" -> false.toString, // we'll commit these
>> > manually
>> >     "key.deserializer" ->
>> >
>> > classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>> >     "value.deserializer" ->
>> > classOf[Decoders.MixedDecoder].getCanonicalName,
>> >     "partition.assignment.strategy" ->
>> >
>> > classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>> >     "bootstrap.servers" -> brokers,
>> >     "group.id" -> applicationName,
>> >     "session.timeout.ms" -> 240000.toString,
>> >     "request.timeout.ms"-> 300000.toString
>> >   )
>> >
>> > Write code looks like the following:
>> >
>> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
>> > conv:
>> > (T) => Array[Byte], numPartitions: Int): Unit = {
>> >   val rddToWrite =
>> >     if (numPartitions > 0) {
>> >       rdd.repartition(numPartitions)
>> >     } else {
>> >       rdd
>> >     }
>> >
>> >   // Get session from current threads session
>> >   val session = SparkSession.builder().getOrCreate()
>> >   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
>> > StructType(Array(StructField("value", BinaryType))))
>> >   df.selectExpr("CAST('' AS STRING)", "value")
>> >     .write
>> >     .format("kafka")
>> >     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
>> >     .option("compression.type", "gzip")
>> >     .option("retries", "3")
>> >     .option("topic", topic)
>> >     .save()
>> > }
>> >
>> > Regards,
>> >
>> > Bryan Jeffrey

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to