Hi,

I am a colleague of Ian's. We use the following processing pipeline in
stream app he mentions:
https://github.com/zalando-incubator/pipeline-backbone

The streams are built using:

object Run extends App {
  // ...

  private val latch = new CountDownLatch(1)

  private val builder = {
    val b = new KStreamBuilder()
    TextPipeline.initializePipelineKStream(
      b,
      Serdes.serdeFrom(new TextPipelineSerializer(), new
TextPipelineDeserializer()),
      Some(latch)
    )
    b
  }

  private val streams = {
    val s = TextPipeline.createKStreams(builder, props)
    s.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
      override def uncaughtException(t: Thread, e: Throwable): Unit = {
        log.error(s"Unexpected Exception caught in thread [${t.getName}]:", e)
      }
    })
    s
  }

  sys.addShutdownHook(shutdown())

  try {


    log.info("Starting...")
    streams.start()
    loopForever()
  } catch {
    case NonFatal(e) => log.error("Exception starting kafka streams", e)
  } finally {
    streams.close()
  }

  def shutdown(): Unit = {
    log.warn("Shutting down the system")
    streams.close()
  }

}


TextPipeline.initializePipelineKStream(builder: KStreamBuilder,
tpdSerde: Serde[TextPipelineDatum], latch: Option[CountDownLatch]) = {
  val coord = new KafkaStreamsBackboneCoordinator(backbone, latch)
                                     // from the linked pipeline
  val kstream = builder.stream(new StringSerde, tpdSerde, INPUT_TOPIC)
  kstream.transformValues[Xor[TransformationPipelineFailure,
TextPipelineDatum]](coord)
    .flatMapValues(new
KafkaStreamValidDatumValueMapper[TextPipelineDatum])
      // from the linked pipeline
}

TextPipeline.createKStreams(builder: KStreamBuilder, props:
Properties): KafkaStreams =
  new KafkaStreams(builder, new StreamsConfig(props))


The stream processing being done is quite slow and intensive (can take
of the order of minutes). We also have a large number of partitions on
our input topic (96), with replication factor 3. We have dropped our
max.poll.records to 10 and still see this problem.

Thanks for your help and best regards,

Nina

On 13 February 2017 at 08:09, Eno Thereska <eno.there...@gmail.com> wrote:
> +1 (non binding)
>
> Checked streams. Verified that stream tests work and examples off 
> confluentinc/examples/kafka-streams work.
>
> Thanks
> Eno
>
>> On 10 Feb 2017, at 16:51, Ewen Cheslack-Postava <e...@confluent.io> wrote:
>>
>> Hello Kafka users, developers and client-developers,
>>
>> This is RC1 for release of Apache Kafka 0.10.2.0.
>>
>> This is a minor version release of Apache Kafka. It includes 19 new KIPs.
>> See the release notes and release plan (https://cwiki.apache.org/
>> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few
>> feature highlights: SASL-SCRAM support, improved client compatibility to
>> allow use of clients newer than the broker, session windows and global
>> tables in the Kafka Streams API, single message transforms in the Kafka
>> Connect framework.
>>
>> Important note: in addition to the artifacts generated using JDK7 for Scala
>> 2.10 and 2.11, this release also includes experimental artifacts built
>> using JDK8 for Scala 2.12.
>>
>> Important code changes since RC0 (non-docs, non system tests):
>>
>> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
>> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
>> * KAFKA-4734; Trim the time index on old segments
>> * KAFKA-4725; Stop leaking messages in produce request body when requests
>> are delayed
>> * KAFKA-4716: Fix case when controller cannot be reached
>>
>> Release notes for the 0.10.2.0 release:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>>
>> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>>
>>
>> * Documentation:
>> http://kafka.apache.org/0102/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/0102/protocol.html
>>
>> * Successful Jenkins builds for the 0.10.2 branch:
>> Unit/integration tests: https://builds.apache.org/job/kafka-0.10.2-jdk7/74/
>> System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.2/25/
>>
>> /**************************************
>>
>> Thanks,
>> Ewen
>



-- 
By monitor glow
On conference paper due
A grad student naps.
                 -- PhD Haiku

Reply via email to