This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1aaa48427ea [improve][io] Add configuration parameter for disabling
aggregation for Kinesis Producers (#24289)
1aaa48427ea is described below
commit 1aaa48427ea5e671171bcabb390767b43aa0d576
Author: Malla Sandeep <[email protected]>
AuthorDate: Wed May 14 23:20:43 2025 +0530
[improve][io] Add configuration parameter for disabling aggregation for
Kinesis Producers (#24289)
(cherry picked from commit 18438bbab505ded332daf856b78b2ed84aaff51c)
---
.../src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 1 +
.../main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java | 6 ++++++
2 files changed, 7 insertions(+)
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index fb8eedff82f..34a360c723a 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -182,6 +182,7 @@ public class KinesisSink extends AbstractAwsConnector
implements Sink<GenericObj
kinesisSinkConfig.getAwsCredentialPluginParam())
.getCredentialProvider();
kinesisConfig.setCredentialsProvider(credentialsProvider);
+
kinesisConfig.setAggregationEnabled(kinesisSinkConfig.isAggregationEnabled());
this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
this.kinesisProducer = new KinesisProducer(kinesisConfig);
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index c5b26a26d0c..eb08496dbcd 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -145,4 +145,10 @@ public class KinesisSinkConfig extends BaseKinesisConfig
implements Serializable
FULL_MESSAGE_IN_JSON_EXPAND_VALUE
}
+
+ @FieldDoc(
+ defaultValue = "true",
+ help = "Enable aggregation. With aggregation, multiple user
records could be packed into a single\n"
+ + " KinesisRecord. If disabled, each user record is sent
in its own KinesisRecord.")
+ private boolean aggregationEnabled = true;
}