This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d84e4eefb add support for multi partition kafka topics (#5398)
d84e4eefb is described below
commit d84e4eefb764b36b63867967b1882d1226481f61
Author: Brendan Doyle <[email protected]>
AuthorDate: Mon Apr 24 09:26:52 2023 -0700
add support for multi partition kafka topics (#5398)
Co-authored-by: Brendan Doyle <[email protected]>
---
.../org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala | 1 +
.../org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala | 2 +-
.../org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala | 2 +-
3 files changed, 3 insertions(+), 2 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index a538dc0cc..1f9bc2ace 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -55,6 +55,7 @@ class KafkaConsumerConnector(
// Currently consumed offset, is used to calculate the topic lag.
// It is updated from one thread in "peek", no concurrent data structure is
necessary
+ // Note: Currently, this value used for metric reporting will not be
accurate if using a multi-partition topic.
private var offset: Long = 0
// Markers for metrics, initialized only once
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index addf680d3..f61f6ec7a 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -64,7 +64,7 @@ object KafkaMessagingProvider extends MessagingProvider {
Try(AdminClient.create(commonConfig +
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
.flatMap(client => {
- val partitions = 1
+ val partitions = topicConfig.getOrElse("partitions", "1").toInt
val nt = new NewTopic(topic, partitions,
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
def createTopic(retries: Int = 5): Try[Unit] = {
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
index 5b616113e..3c4a410d0 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
@@ -51,7 +51,7 @@ class KafkaProducerConnector(
/** Sends msg to topic. This is an asynchronous operation. */
override def send(topic: String, msg: Message, retry: Int = 3):
Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
- val record = new ProducerRecord[String, String](topic, "messages",
msg.serialize)
+ val record = new ProducerRecord[String, String](topic, msg.serialize)
val produced = Promise[ResultMetadata]()
Future {