Just to be 100% sure, when you're logging the group id in createDirectStream, you no longer see any duplicates?
Regarding testing master, is the blocker that your spark cluster is on 2.3? There's at least a reasonable chance that building an application assembly jar that uses the master version just for the spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Cody, > > We are connecting to multiple clusters for each topic. I did experiment > this morning with both adding a cluster identifier to the group id, as well > as simply moving to use only a single one of our clusters. Neither of these > were successful. I am not able to run a test against master now. > > Regards, > > Bryan Jeffrey > > > > > On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <c...@koeninger.org> wrote: >> >> I doubt that fix will get backported to 2.3.x >> >> Are you able to test against master? 2.4 with the fix you linked to >> is likely to hit code freeze soon. >> >> From a quick look at your code, I'm not sure why you're mapping over >> an array of brokers. It seems like that would result in different >> streams with the same group id, because broker isn't part of your >> group id string. >> >> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> >> wrote: >> > Hello, Spark Users. >> > >> > We have an application using Spark 2.3.0 and the 0.8 Kafka client. >> > We're >> > have a Spark streaming job, and we're reading a reasonable amount of >> > data >> > from Kafka (40 GB / minute or so). We would like to move to using the >> > Kafka >> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having >> > to >> > modify formats. >> > >> > We've run into https://issues.apache.org/jira/browse/SPARK-19185, >> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'. I've >> > tried to >> > work around it as follows: >> > >> > 1. Disabled consumer caching. This increased the total job time from ~1 >> > minute per batch to ~1.8 minutes per batch. This performance penalty is >> > unacceptable for our use-case. We also saw some partitions stop >> > receiving >> > for an extended period of time - I was unable to get a simple repro for >> > this >> > effect though. >> > 2. Disabled speculation and multiple-job concurrency and added caching >> > for >> > the stream directly after reading from Kafka & caching offsets. This >> > approach seems to work well for simple examples (read from a Kafka >> > topic, >> > write to another topic). However, when we move through more complex >> > logic we >> > continue to see this type of error - despite only creating the stream >> > for a >> > given topic a single time. We validated that we're creating the stream >> > from >> > a given topic / partition a single time by logging on stream creation, >> > caching the stream and (eventually) calling 'runJob' to actually go and >> > fetch the data. Nonetheless with multiple outputs we see the >> > ConcurrentModificationException. >> > >> > I've included some code down below. I would be happy if anyone had >> > debugging tips for the workaround. However, my main concern is to >> > ensure >> > that the 2.4 version will have a bug fix that will work for Spark >> > Streaming >> > in which multiple input topics map data to multiple outputs. I would >> > also >> > like to understand if the fix >> > (https://github.com/apache/spark/pull/20997) >> > will be backported to Spark 2.3.x >> > >> > In our code, read looks like the following: >> > >> > case class StreamLookupKey(topic: Set[String], brokers: String) >> > >> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = >> > Map() >> > >> > // Given inputs return a direct stream. >> > def createDirectStream(ssc: StreamingContext, >> > additionalKafkaParameters: Map[String, String], >> > brokersToUse: Array[String], // >> > broker1,broker2|broker3,broker4 >> > topicsToUse: Array[String], >> > applicationName: String, >> > persist: Option[PersistenceManager], >> > useOldestOffsets: Boolean, >> > maxRatePerPartition: Long, >> > batchSeconds: Int >> > ): DStream[DecodedData] = { >> > val streams: Array[DStream[DecodedData]] = >> > brokersToUse.map(brokers => { >> > val groupId = s"${applicationName}~${topicsToUse.mkString("~")}" >> > val kafkaParameters: Map[String, String] = >> > getKafkaParameters(brokers, >> > useOldestOffsets, groupId) ++ additionalKafkaParameters >> > logger.info(s"Kafka Params: ${kafkaParameters}") >> > val topics = topicsToUse.toSet >> > logger.info(s"Creating Kafka direct connection - >> > ${kafkaParameters.mkString(GeneralConstants.comma)} " + >> > s"topics: ${topics.mkString(GeneralConstants.comma)} w/ >> > applicationGroup: ${groupId}") >> > >> > streamMap.getOrElse(StreamLookupKey(topics, brokers), >> > createKafkaStream(ssc, applicationName, topics, brokers, >> > maxRatePerPartition, batchSeconds, kafkaParameters)) >> > }) >> > >> > ssc.union(streams) >> > } >> > >> > private def createKafkaStream(ssc: StreamingContext, applicationName: >> > String, topics: Set[String], brokers: String, >> > maxRatePerPartition: Long, batchSeconds: >> > Int, >> > kafkaParameters: Map[String,String]): DStream[DecodedData] = { >> > logger.info(s"Creating a stream from Kafka for application >> > ${applicationName} w/ topic ${topics} and " + >> > s"brokers: ${brokers.split(',').head} with parameters: >> > ${kafkaParameters.mkString("|")}") >> > try { >> > val consumerStrategy = ConsumerStrategies.Subscribe[String, >> > DecodedData](topics.toSeq, kafkaParameters) >> > val stream: InputDStream[ConsumerRecord[String, DecodedData]] = >> > KafkaUtils.createDirectStream(ssc, locationStrategy = >> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy) >> > >> > KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, >> > stream, >> > maxRatePerPartition, batchSeconds) >> > val result = >> > >> > stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel) >> > streamMap += StreamLookupKey(topics, brokers) -> result >> > result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: >> > Iterator[_]) >> > => {})) >> > result >> > } catch ErrorHandling.safelyCatch { >> > case e: Exception => >> > logger.error("Unable to create direct stream:") >> > e.printStackTrace() >> > throw KafkaDirectStreamException(topics.toArray, brokers, e) >> > } >> > } >> > >> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, >> > applicationName: String): Map[String, String] = >> > Map[String, String]( >> > "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else >> > "latest"), >> > "enable.auto.commit" -> false.toString, // we'll commit these >> > manually >> > "key.deserializer" -> >> > >> > classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName, >> > "value.deserializer" -> >> > classOf[Decoders.MixedDecoder].getCanonicalName, >> > "partition.assignment.strategy" -> >> > >> > classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName, >> > "bootstrap.servers" -> brokers, >> > "group.id" -> applicationName, >> > "session.timeout.ms" -> 240000.toString, >> > "request.timeout.ms"-> 300000.toString >> > ) >> > >> > Write code looks like the following: >> > >> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], >> > conv: >> > (T) => Array[Byte], numPartitions: Int): Unit = { >> > val rddToWrite = >> > if (numPartitions > 0) { >> > rdd.repartition(numPartitions) >> > } else { >> > rdd >> > } >> > >> > // Get session from current threads session >> > val session = SparkSession.builder().getOrCreate() >> > val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))), >> > StructType(Array(StructField("value", BinaryType)))) >> > df.selectExpr("CAST('' AS STRING)", "value") >> > .write >> > .format("kafka") >> > .option("kafka.bootstrap.servers", getBrokersToUse(brokers)) >> > .option("compression.type", "gzip") >> > .option("retries", "3") >> > .option("topic", topic) >> > .save() >> > } >> > >> > Regards, >> > >> > Bryan Jeffrey --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org