Shreya21Mishra commented on issue #1672:
URL:
https://github.com/apache/camel-kafka-connector/issues/1672#issuecomment-2367303621
Connector Configuration:
`{
"name": "Connector-for-Custom-Timer",
"config": {
"connector.class":
"org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "sinkAsAvro",
"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.retry.timeout": "300000",
"errors.retry.delay.max.ms": "60000",
"topics": "msk.retina-shreya.cdtadlsconnectorsink.topic.internal.any.v5",
"transforms.sinkAsAvro.type":
"maersk.retina.kafka.connect.transforms.adlssink.core.SinkAvroFile",
"transforms.sinkAsAvro.batchsize": "10",
"camel.aggregation.timeout": "360000",
"transforms.sinkAsAvro.converter.type": "value",
"camel.kamelet.azure-storage-datalake-sink.accountName":
"retinaconnectorpoc",
"camel.kamelet.azure-storage-datalake-sink.clientId":
"328ea5df-d095-4c9f-ba69-79c468792db9",
"camel.kamelet.azure-storage-datalake-sink.clientSecret":
"pFp8Q~rfw.DAlCYmoeRmfyZnCF6eZ1juNonYUcc0",
"camel.kamelet.azure-storage-datalake-sink.tenantId":
"05d75c05-fa1a-42e7-9cf1-eb416c396f2d",
"camel.kamelet.azure-storage-datalake-sink.fileSystemName":
"adlswithouttimeout",
"camel.sink.header.file.extension": "avro",
"camel.sink.header.file.location.partition.directory": "yes",
"camel.sink.header.file.location.date.directory": "YYYY/MM/dd/HH/mm",
"camel.sink.header.file.location.directory.path":
"customDirectory1,dateDirectory,partitionDirectory",
"camel.sink.header.file.location.custom.directory.1":
"pma-offers-retina-ms"
}
}`
SMT for aggragation:
` @Override
public R apply(R record) {
LOG.trace("Entering apply method - Thread: {}",
Thread.currentThread().getName());
LOG.debug("Thread {} acquired lock for applying record.",
Thread.currentThread().getName());
LOG.debug("Received record with key {} and value {}",
record.key(), record.value());
buffer.add(record);
LOG.debug("Buffer size after adding new record: {}",
buffer.size());
// Check if buffer has reached the batch size or timeout has
elapsed
if (buffer.size() >= batchSize) {
LOG.debug("Buffer size met");
R aggregatedRecord = (R) applyTransformation(buffer);
LOG.debug("Aggregated record with value: {}",
aggregatedRecord.value());
buffer.clear();
return aggregatedRecord;
}
}
LOG.trace("Exiting apply method - Thread: {}",
Thread.currentThread().getName());
return null; // Return null if no aggregation is performed
}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]