This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-batch-s3 in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit 8dd6f5658258bd77c07e1bbbd0d130c1e4f18c5d Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Feb 16 11:32:51 2024 +0100 Improve Kafka Batch example Signed-off-by: Andrea Cosentino <[email protected]> --- jbang/kafka-batch-log/BatchLog.java | 32 --------- jbang/kafka-batch-log/README.adoc | 104 +++++++++++++++-------------- jbang/kafka-batch-log/kafka-batch-log.yaml | 25 +++++-- 3 files changed, 72 insertions(+), 89 deletions(-) diff --git a/jbang/kafka-batch-log/BatchLog.java b/jbang/kafka-batch-log/BatchLog.java deleted file mode 100644 index 54d89bd..0000000 --- a/jbang/kafka-batch-log/BatchLog.java +++ /dev/null @@ -1,32 +0,0 @@ -package camel.example; - -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.util.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BatchLog implements Processor { - - private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class); - - @Override - public void process(Exchange e) throws Exception { - final List<?> exchanges = e.getMessage().getBody(List.class); - - // Ensure we are actually receiving what we are asking for - if (exchanges == null || exchanges.isEmpty()) { - return; - } - - // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list - for (Object obj : exchanges) { - if (obj instanceof Exchange) { - LOG.info("Processing exchange with body {}", ((Exchange)obj).getMessage().getBody(String.class)); - } - } - } - -} diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc index 31715d4..12d1a5b 100644 --- a/jbang/kafka-batch-log/README.adoc +++ b/jbang/kafka-batch-log/README.adoc @@ -1,6 +1,6 @@ == Kafka Batch Consumer with Manual commit -In this sample you'll use the Kafka Batch Source Kamelet in action. +In this sample you'll use the Kafka Batch Source Kamelet in action and write the single records of the batch into an S3 bucket. === Install JBang @@ -51,13 +51,21 @@ ansible-playbook -v deploy.yaml This should start a Kafka instance for you, on your local machine. +=== Set up AWS S3 + +Create a bucket on your personal account. + +The Kamelet will use the defaultCredentialsProvider, so you'll need the credentials file on your hosts. + +Modify the kafka-batch-s3.yaml file to add the correct region and the correct bucket name. + === How to run Then you can run this example using: [source,sh] ---- -$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java kafka-batch-log.yaml +$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-s3.yaml ---- === Consumer running @@ -66,32 +74,31 @@ You should see: [source,sh] ---- -2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:4) -2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-log (kamelet://kafka-batch-not-secured-source) -2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-not-secured-source-1 (kafka://test-topic) -2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started log-sink-2 (kamelet://source) -2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-manual-commit-action-3 (kamelet://source) -2024-02-05 09:38:24.104 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 354ms (build:0ms init:0ms start:354ms) -2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1 -2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5 -2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1707122304192 -2024-02-05 09:38:24.197 INFO 21666 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy -2024-02-05 09:38:24.197 INFO 21666 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic -2024-02-05 09:38:24.198 INFO 21666 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic -2024-02-05 09:38:24.475 WARN 21666 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE} -2024-02-05 09:38:24.477 INFO 21666 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: VxYjgKU6RGSnOeHWuObnwA -2024-02-05 09:38:24.483 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) -2024-02-05 09:38:24.487 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group -2024-02-05 09:38:24.530 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af -2024-02-05 09:38:24.532 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) -2024-02-05 09:38:24.533 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group -2024-02-05 09:38:24.536 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'} -2024-02-05 09:38:24.594 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 1: {consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af=Assignment(partitions=[test-topic-0])} -2024-02-05 09:38:24.607 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'} -2024-02-05 09:38:24.611 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0]) -2024-02-05 09:38:24.615 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0 -2024-02-05 09:38:24.632 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Found no committed offset for partition test-topic-0 -2024-02-05 09:38:24.648 INFO 21666 --- [mer[test-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, groupId=my-group] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}. +2024-02-16 10:19:47.357 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:4) +2024-02-16 10:19:47.357 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-log (kamelet://kafka-batch-not-secured-source) +2024-02-16 10:19:47.357 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-not-secured-source-1 (kafka://test-topic) +2024-02-16 10:19:47.358 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started aws-s3-sink-2 (kamelet://source) +2024-02-16 10:19:47.358 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-manual-commit-action-3 (kamelet://source) +2024-02-16 10:19:47.358 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 1s244ms (build:0ms init:0ms start:1s244ms) +2024-02-16 10:19:47.418 INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1 +2024-02-16 10:19:47.418 INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5 +2024-02-16 10:19:47.419 INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708075187417 +2024-02-16 10:19:47.424 INFO 17500 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy +2024-02-16 10:19:47.424 INFO 17500 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic +2024-02-16 10:19:47.425 INFO 17500 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic +2024-02-16 10:19:47.693 INFO 17500 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: QKy-eUclRryoTxWZq4xsPA +2024-02-16 10:19:47.694 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2024-02-16 10:19:47.697 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group +2024-02-16 10:19:47.710 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733 +2024-02-16 10:19:47.712 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) +2024-02-16 10:19:47.712 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group +2024-02-16 10:19:47.718 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=19, memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', protocol='range'} +2024-02-16 10:19:47.725 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 19: {consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733=Assignment(partitions=[test-topic-0])} +2024-02-16 10:19:47.733 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=19, memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', protocol='range'} +2024-02-16 10:19:47.734 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0]) +2024-02-16 10:19:47.737 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0 +2024-02-16 10:19:47.749 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}} + ---- At this point we should start sending messages to the test-topic topic. We could use kcat for this. @@ -101,14 +108,6 @@ At this point we should start sending messages to the test-topic topic. We could for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done ---- -In the consumer log, once the pollTimeout of 40 s completes, you should see an output of - -[source,sh] ----- -2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:07.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there ----- - If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action. [source,sh] @@ -126,28 +125,31 @@ You could also try to send groups of 10 records to see how the batch consumer be for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done ---- -And you should immediately see the output in group of 10 records +When the process complete you can check your aws bucket: [source,sh] ---- +$ aws s3 ls s3://<bucket_name> +2024-02-16 10:20:57 10 test-topic-20240216102055784.txt +2024-02-16 10:20:57 10 test-topic-20240216102056153.txt +2024-02-16 10:20:57 10 test-topic-20240216102056281.txt +2024-02-16 10:20:57 10 test-topic-20240216102056409.txt +2024-02-16 10:20:57 10 test-topic-20240216102056541.txt +2024-02-16 10:20:57 10 test-topic-20240216102056667.txt +2024-02-16 10:20:57 10 test-topic-20240216102056803.txt +2024-02-16 10:20:57 10 test-topic-20240216102056930.txt . . . . -2024-02-05 09:42:40.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.913 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.914 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.920 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.928 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.930 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.940 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.950 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -2024-02-05 09:42:40.955 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there -. -. -. -. +---- + +And you could also verify the content + +[source,sh] +---- +$ aws s3 cp s3://<bucket_name>/test-topic-20240216102055784.txt - +hello there ---- If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now. diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml index 1a779bd..a1e4e6d 100644 --- a/jbang/kafka-batch-log/kafka-batch-log.yaml +++ b/jbang/kafka-batch-log/kafka-batch-log.yaml @@ -17,10 +17,6 @@ # camel-k: dependency=camel:kafka -- beans: - - name: batchLog - type: "#class:camel.example.BatchLog" - - route: id: "kafka-to-log" from: @@ -34,8 +30,25 @@ maxPollIntervalMs: 60000 autoCommitEnable: false allowManualCommit: true + deserializeHeaders: true steps: - - bean: - ref: batchLog + - split: + simple: "${body}" + steps: + - setHeaders: + headers: + - name: "kafka.TOPIC" + simple: "${body.getMessage().getHeader('kafka.TOPIC')}" + - setHeader: + name: "file" + simple: "${headers[kafka.TOPIC]}-${date:now:yyyyMMddHHmmssSSS}.txt" + - setBody: + simple: "${body.getMessage().getBody()}" + - to: + uri: "kamelet:aws-s3-sink" + parameters: + useDefaultCredentialsProvider: true + region: "eu-west-1" + bucketNameOrArn: kamelets-demo - to: uri: "kamelet:kafka-batch-manual-commit-action"
