This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d84e4eefb add support for multi partition kafka topics (#5398)
d84e4eefb is described below

commit d84e4eefb764b36b63867967b1882d1226481f61
Author: Brendan Doyle <bdoyle0...@gmail.com>
AuthorDate: Mon Apr 24 09:26:52 2023 -0700

    add support for multi partition kafka topics (#5398)
    
    Co-authored-by: Brendan Doyle <brend...@qualtrics.com>
---
 .../org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala   | 1 +
 .../org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala   | 2 +-
 .../org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala   | 2 +-
 3 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index a538dc0cc..1f9bc2ace 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -55,6 +55,7 @@ class KafkaConsumerConnector(
 
   // Currently consumed offset, is used to calculate the topic lag.
   // It is updated from one thread in "peek", no concurrent data structure is 
necessary
+  // Note: Currently, this value used for metric reporting will not be 
accurate if using a multi-partition topic.
   private var offset: Long = 0
 
   // Markers for metrics, initialized only once
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index addf680d3..f61f6ec7a 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -64,7 +64,7 @@ object KafkaMessagingProvider extends MessagingProvider {
 
     Try(AdminClient.create(commonConfig + 
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
       .flatMap(client => {
-        val partitions = 1
+        val partitions = topicConfig.getOrElse("partitions", "1").toInt
         val nt = new NewTopic(topic, partitions, 
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
 
         def createTopic(retries: Int = 5): Try[Unit] = {
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
index 5b616113e..3c4a410d0 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
@@ -51,7 +51,7 @@ class KafkaProducerConnector(
   /** Sends msg to topic. This is an asynchronous operation. */
   override def send(topic: String, msg: Message, retry: Int = 3): 
Future[ResultMetadata] = {
     implicit val transid: TransactionId = msg.transid
-    val record = new ProducerRecord[String, String](topic, "messages", 
msg.serialize)
+    val record = new ProducerRecord[String, String](topic, msg.serialize)
     val produced = Promise[ResultMetadata]()
 
     Future {

Reply via email to