This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-batch-source in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit cb254541492e8582d639df4732cf64e159ad6a82 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Jan 30 11:28:24 2024 +0100 Support Kafka Batch as Kamelet source - Kafka Not Secured Source as batch Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- .../kafka-batch-not-secured-source.kamelet.yaml | 129 +++++++++++++++++++++ .../utils/transform/kafka/BatchManualCommit.java | 42 +++++++ 2 files changed, 171 insertions(+) diff --git a/kamelets/kafka-batch-not-secured-source.kamelet.yaml b/kamelets/kafka-batch-not-secured-source.kamelet.yaml new file mode 100644 index 00000000..14cb1040 --- /dev/null +++ b/kamelets/kafka-batch-not-secured-source.kamelet.yaml @@ -0,0 +1,129 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-batch-not-secured-source + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBBZG9iZSBJbGx1c3RyYXRvciAxOS4wLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9uOiA2LjAwIEJ1aWxkIDApICAtLT4NCjxzdmcgdmVyc2lvbj0iMS4xIiBpZD0iTGF5ZXJfMSIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiB4bWxuczp4bGluaz0iaHR0cDovL3d3dy53My5vcmcvMTk5OS94bGluayIgeD0iMHB4IiB5PSIwcHgiDQoJIHZpZXdCb3g9IjAgMCA1MDAgNTAwIiBzdHlsZT0iZW5hYmxlLWJhY2tncm91bmQ6bmV3IDAgMCA1MD [...] + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Kafka Batch Not Secured Source" + description: |- + Receive data from Kafka topics in batch on an insecure broker and commit them manually through KafkaManualCommit. + required: + - topic + - bootstrapServers + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + default: true + batchSize: + title: The batch dimension + description: The maximum number of records returned in a single call to poll() + type: int + default: 500 + pollTimeout: + title: The poll timeout interval + description: The timeout used when polling the KafkaConsumer + type: int + default: 5000 + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.0-SNAPSHOT" + - "camel:kafka" + - "camel:core" + - "camel:kamelet" + template: + beans: + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + - name: manualCommitFactory + type: "#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + maxPollRecords: "{{batchSize}}" + pollTimeoutMs: "{{pollTimeout}}" + batching: true + kafkaManualCommitFactory: "#bean:{{manualCommitFactory}}" + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink" diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/BatchManualCommit.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/BatchManualCommit.java new file mode 100644 index 00000000..f6d7d841 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/BatchManualCommit.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kamelets.utils.transform.kafka; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; + +import java.util.List; + +public class BatchManualCommit implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + List<?> exchanges = exchange.getMessage().getBody(List.class); + if (exchanges.size() > 0) { + final Object tmp = exchanges.get(exchanges.size() - 1); + if (tmp instanceof Exchange element) { + KafkaManualCommit manual + = element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (manual != null) { + manual.commit(); + } + } + } + } +}