This is an automated email from the ASF dual-hosted git repository. bdoyle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new d84e4eefb add support for multi partition kafka topics (#5398) d84e4eefb is described below commit d84e4eefb764b36b63867967b1882d1226481f61 Author: Brendan Doyle <bdoyle0...@gmail.com> AuthorDate: Mon Apr 24 09:26:52 2023 -0700 add support for multi partition kafka topics (#5398) Co-authored-by: Brendan Doyle <brend...@qualtrics.com> --- .../org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala | 1 + .../org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala | 2 +- .../org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala index a538dc0cc..1f9bc2ace 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala @@ -55,6 +55,7 @@ class KafkaConsumerConnector( // Currently consumed offset, is used to calculate the topic lag. // It is updated from one thread in "peek", no concurrent data structure is necessary + // Note: Currently, this value used for metric reporting will not be accurate if using a multi-partition topic. private var offset: Long = 0 // Markers for metrics, initialized only once diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala index addf680d3..f61f6ec7a 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala @@ -64,7 +64,7 @@ object KafkaMessagingProvider extends MessagingProvider { Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts))) .flatMap(client => { - val partitions = 1 + val partitions = topicConfig.getOrElse("partitions", "1").toInt val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) def createTopic(retries: Int = 5): Try[Unit] = { diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala index 5b616113e..3c4a410d0 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala @@ -51,7 +51,7 @@ class KafkaProducerConnector( /** Sends msg to topic. This is an asynchronous operation. */ override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = { implicit val transid: TransactionId = msg.transid - val record = new ProducerRecord[String, String](topic, "messages", msg.serialize) + val record = new ProducerRecord[String, String](topic, msg.serialize) val produced = Promise[ResultMetadata]() Future {