This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 8dd6f5658258bd77c07e1bbbd0d130c1e4f18c5d
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Feb 16 11:32:51 2024 +0100

    Improve Kafka Batch example
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 jbang/kafka-batch-log/BatchLog.java        |  32 ---------
 jbang/kafka-batch-log/README.adoc          | 104 +++++++++++++++--------------
 jbang/kafka-batch-log/kafka-batch-log.yaml |  25 +++++--
 3 files changed, 72 insertions(+), 89 deletions(-)

diff --git a/jbang/kafka-batch-log/BatchLog.java 
b/jbang/kafka-batch-log/BatchLog.java
deleted file mode 100644
index 54d89bd..0000000
--- a/jbang/kafka-batch-log/BatchLog.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package camel.example;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchLog implements Processor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class);
-
-    @Override
-    public void process(Exchange e) throws Exception {
-        final List<?> exchanges = e.getMessage().getBody(List.class);
-
-        // Ensure we are actually receiving what we are asking for
-        if (exchanges == null || exchanges.isEmpty()) {
-            return;
-        }
-
-        // The records from the batch are stored in a list of exchanges in the 
original exchange. To process, we iterate over that list
-        for (Object obj : exchanges) {
-            if (obj instanceof Exchange) {
-                LOG.info("Processing exchange with body {}", 
((Exchange)obj).getMessage().getBody(String.class));
-            }
-        }
-    }
-
-}
diff --git a/jbang/kafka-batch-log/README.adoc 
b/jbang/kafka-batch-log/README.adoc
index 31715d4..12d1a5b 100644
--- a/jbang/kafka-batch-log/README.adoc
+++ b/jbang/kafka-batch-log/README.adoc
@@ -1,6 +1,6 @@
 == Kafka Batch Consumer with Manual commit
 
-In this sample you'll use the Kafka Batch Source Kamelet in action.
+In this sample you'll use the Kafka Batch Source Kamelet in action and write 
the single records of the batch into an S3 bucket.
 
 === Install JBang
 
@@ -51,13 +51,21 @@ ansible-playbook -v deploy.yaml
 
 This should start a Kafka instance for you, on your local machine.
 
+=== Set up AWS S3
+
+Create a bucket on your personal account.
+
+The Kamelet will use the defaultCredentialsProvider, so you'll need the 
credentials file on your hosts.
+
+Modify the kafka-batch-s3.yaml file to add the correct region and the correct 
bucket name.
+
 === How to run
 
 Then you can run this example using:
 
 [source,sh]
 ----
-$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run 
--local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java 
kafka-batch-log.yaml
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run 
--local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-s3.yaml
 ----
 
 === Consumer running
@@ -66,32 +74,31 @@ You should see:
 
 [source,sh]
 ----
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] 
el.impl.engine.AbstractCamelContext : Routes startup (started:4)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started kafka-to-log 
(kamelet://kafka-batch-not-secured-source)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started 
kafka-batch-not-secured-source-1 (kafka://test-topic)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started log-sink-2 (kamelet://source)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started 
kafka-batch-manual-commit-action-3 (kamelet://source)
-2024-02-05 09:38:24.104  INFO 21666 --- [           main] 
el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT 
(kafka-batch-log) started in 354ms (build:0ms init:0ms start:354ms)
-2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
-2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
-2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1707122304192
-2024-02-05 09:38:24.197  INFO 21666 --- [mer[test-topic]] 
ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
-2024-02-05 09:38:24.197  INFO 21666 --- [mer[test-topic]] 
l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic 
test-topic
-2024-02-05 09:38:24.198  INFO 21666 --- [mer[test-topic]] 
afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Subscribed to topic(s): test-topic
-2024-02-05 09:38:24.475  WARN 21666 --- [mer[test-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Error while fetching metadata with correlation id 2 : 
{test-topic=LEADER_NOT_AVAILABLE}
-2024-02-05 09:38:24.477  INFO 21666 --- [mer[test-topic]] 
org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Cluster ID: VxYjgKU6RGSnOeHWuObnwA
-2024-02-05 09:38:24.483  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: 
null)
-2024-02-05 09:38:24.487  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] (Re-)joining group
-2024-02-05 09:38:24.530  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Request joining group due to: need to re-join with the given 
member-id: consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af
-2024-02-05 09:38:24.532  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Request joining group due to: rebalance failed due to 'The 
group member needs to have a valid member id before actually entering a 
consumer group.' (MemberIdRequiredException)
-2024-02-05 09:38:24.533  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] (Re-)joining group
-2024-02-05 09:38:24.536  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Successfully joined group with generation 
Generation{generationId=1, 
memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', 
protocol='range'}
-2024-02-05 09:38:24.594  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Finished assignment for group at generation 1: 
{consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af=Assignment(partitions=[test-topic-0])}
-2024-02-05 09:38:24.607  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Successfully synced group in generation 
Generation{generationId=1, 
memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', 
protocol='range'}
-2024-02-05 09:38:24.611  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Notifying assignor about the new 
Assignment(partitions=[test-topic-0])
-2024-02-05 09:38:24.615  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Adding newly assigned partitions: test-topic-0
-2024-02-05 09:38:24.632  INFO 21666 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Found no committed offset for partition test-topic-0
-2024-02-05 09:38:24.648  INFO 21666 --- [mer[test-topic]] 
onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Resetting offset for partition test-topic-0 to position 
FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], 
epoch=0}}.
+2024-02-16 10:19:47.357  INFO 17500 --- [           main] 
el.impl.engine.AbstractCamelContext : Routes startup (started:4)
+2024-02-16 10:19:47.357  INFO 17500 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started kafka-to-log 
(kamelet://kafka-batch-not-secured-source)
+2024-02-16 10:19:47.357  INFO 17500 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started 
kafka-batch-not-secured-source-1 (kafka://test-topic)
+2024-02-16 10:19:47.358  INFO 17500 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started aws-s3-sink-2 
(kamelet://source)
+2024-02-16 10:19:47.358  INFO 17500 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started 
kafka-batch-manual-commit-action-3 (kamelet://source)
+2024-02-16 10:19:47.358  INFO 17500 --- [           main] 
el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT 
(kafka-batch-log) started in 1s244ms (build:0ms init:0ms start:1s244ms)
+2024-02-16 10:19:47.418  INFO 17500 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-02-16 10:19:47.418  INFO 17500 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-02-16 10:19:47.419  INFO 17500 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708075187417
+2024-02-16 10:19:47.424  INFO 17500 --- [mer[test-topic]] 
ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-02-16 10:19:47.424  INFO 17500 --- [mer[test-topic]] 
l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic 
test-topic
+2024-02-16 10:19:47.425  INFO 17500 --- [mer[test-topic]] 
afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Subscribed to topic(s): test-topic
+2024-02-16 10:19:47.693  INFO 17500 --- [mer[test-topic]] 
org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Cluster ID: QKy-eUclRryoTxWZq4xsPA
+2024-02-16 10:19:47.694  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: 
null)
+2024-02-16 10:19:47.697  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] (Re-)joining group
+2024-02-16 10:19:47.710  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Request joining group due to: need to re-join with the given 
member-id: consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733
+2024-02-16 10:19:47.712  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Request joining group due to: rebalance failed due to 'The 
group member needs to have a valid member id before actually entering a 
consumer group.' (MemberIdRequiredException)
+2024-02-16 10:19:47.712  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] (Re-)joining group
+2024-02-16 10:19:47.718  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Successfully joined group with generation 
Generation{generationId=19, 
memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', 
protocol='range'}
+2024-02-16 10:19:47.725  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Finished assignment for group at generation 19: 
{consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733=Assignment(partitions=[test-topic-0])}
+2024-02-16 10:19:47.733  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Successfully synced group in generation 
Generation{generationId=19, 
memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', 
protocol='range'}
+2024-02-16 10:19:47.734  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Notifying assignor about the new 
Assignment(partitions=[test-topic-0])
+2024-02-16 10:19:47.737  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Adding newly assigned partitions: test-topic-0
+2024-02-16 10:19:47.749  INFO 17500 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Setting offset for partition test-topic-0 to the committed 
offset FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], 
epoch=0}}
+
 ----
 
 At this point we should start sending messages to the test-topic topic. We 
could use kcat for this.
@@ -101,14 +108,6 @@ At this point we should start sending messages to the 
test-topic topic. We could
 for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t 
test-topic; done
 ----
 
-In the consumer log, once the pollTimeout of 40 s completes, you should see an 
output of
-
-[source,sh]
-----
-2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:07.909  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-----
-
 If you check the situation for the consumer group 'my-group' you could see 
that the commit happened manually by using the kafka-batch-manual-commit-action.
 
 [source,sh]
@@ -126,28 +125,31 @@ You could also try to send groups of 10 records to see 
how the batch consumer be
 for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t 
test-topic; done
 ----
 
-And you should immediately see the output in group of 10 records
+When the process complete you can check your aws bucket:
 
 [source,sh]
 ----
+$ aws s3 ls s3://<bucket_name>
+2024-02-16 10:20:57         10 test-topic-20240216102055784.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056153.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056281.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056409.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056541.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056667.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056803.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056930.txt
 .
 .
 .
 .
-2024-02-05 09:42:40.908  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.909  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.913  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.914  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.920  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.928  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.930  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.940  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.950  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.955  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
-.
-.
-.
-.
+----
+
+And you could also verify the content
+
+[source,sh]
+----
+$ aws s3 cp s3://<bucket_name>/test-topic-20240216102055784.txt -
+hello there
 ----
 
 If you check again the offset for the consumers of my-group group you'll 
notice we are at offset 52 now.
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml 
b/jbang/kafka-batch-log/kafka-batch-log.yaml
index 1a779bd..a1e4e6d 100644
--- a/jbang/kafka-batch-log/kafka-batch-log.yaml
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -17,10 +17,6 @@
 
 # camel-k: dependency=camel:kafka
 
-- beans:
-  - name: batchLog
-    type: "#class:camel.example.BatchLog"
-
 - route:
     id: "kafka-to-log"
     from:
@@ -34,8 +30,25 @@
         maxPollIntervalMs: 60000
         autoCommitEnable: false
         allowManualCommit: true
+        deserializeHeaders: true
       steps:
-        - bean:
-            ref: batchLog
+        - split:
+            simple: "${body}"
+            steps:
+              - setHeaders:
+                  headers:
+                    - name: "kafka.TOPIC"
+                      simple: "${body.getMessage().getHeader('kafka.TOPIC')}"
+              - setHeader:
+                  name: "file"
+                  simple: 
"${headers[kafka.TOPIC]}-${date:now:yyyyMMddHHmmssSSS}.txt"
+              - setBody:
+                  simple: "${body.getMessage().getBody()}"
+              - to:
+                  uri: "kamelet:aws-s3-sink"
+                  parameters:
+                    useDefaultCredentialsProvider: true
+                    region: "eu-west-1"
+                    bucketNameOrArn: kamelets-demo             
         - to:
             uri: "kamelet:kafka-batch-manual-commit-action"

Reply via email to