Shreya21Mishra commented on issue #1672:
URL: 
https://github.com/apache/camel-kafka-connector/issues/1672#issuecomment-2367303621

   Connector Configuration: 
   `{
     "name": "Connector-for-Custom-Timer",
     "config": {
       "connector.class": 
"org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector",
       "tasks.max": "1",
       "key.converter": "org.apache.kafka.connect.storage.StringConverter",
       "value.converter": "org.apache.kafka.connect.storage.StringConverter",
       "transforms": "sinkAsAvro",
       "errors.tolerance": "none",
       "errors.log.enable": "true",
       "errors.log.include.messages": "true",
       "errors.retry.timeout": "300000",
       "errors.retry.delay.max.ms": "60000",
       "topics": "msk.retina-shreya.cdtadlsconnectorsink.topic.internal.any.v5",
       "transforms.sinkAsAvro.type": 
"maersk.retina.kafka.connect.transforms.adlssink.core.SinkAvroFile",
       "transforms.sinkAsAvro.batchsize": "10",
       "camel.aggregation.timeout": "360000",
       "transforms.sinkAsAvro.converter.type": "value",
       "camel.kamelet.azure-storage-datalake-sink.accountName": 
"retinaconnectorpoc",
       "camel.kamelet.azure-storage-datalake-sink.clientId": 
"328ea5df-d095-4c9f-ba69-79c468792db9",
       "camel.kamelet.azure-storage-datalake-sink.clientSecret": 
"pFp8Q~rfw.DAlCYmoeRmfyZnCF6eZ1juNonYUcc0",
       "camel.kamelet.azure-storage-datalake-sink.tenantId": 
"05d75c05-fa1a-42e7-9cf1-eb416c396f2d",
       "camel.kamelet.azure-storage-datalake-sink.fileSystemName": 
"adlswithouttimeout",
       "camel.sink.header.file.extension": "avro",
       "camel.sink.header.file.location.partition.directory": "yes",
       "camel.sink.header.file.location.date.directory": "YYYY/MM/dd/HH/mm",
       "camel.sink.header.file.location.directory.path": 
"customDirectory1,dateDirectory,partitionDirectory",
       "camel.sink.header.file.location.custom.directory.1": 
"pma-offers-retina-ms"
     }
   }`
   
   SMT for aggragation:
   `    @Override
       public R apply(R record) {
           LOG.trace("Entering apply method - Thread: {}", 
Thread.currentThread().getName());
               LOG.debug("Thread {} acquired lock for applying record.", 
Thread.currentThread().getName());
               LOG.debug("Received record with key {} and value {}", 
record.key(), record.value());
               buffer.add(record);
               LOG.debug("Buffer size after adding new record: {}", 
buffer.size());
   
               // Check if buffer has reached the batch size or timeout has 
elapsed
               if (buffer.size() >= batchSize) {
                   LOG.debug("Buffer size met");
   
                   R aggregatedRecord = (R) applyTransformation(buffer);
                   LOG.debug("Aggregated record with value: {}", 
aggregatedRecord.value());
                   buffer.clear();
                   return aggregatedRecord;
               } 
               }
           
           LOG.trace("Exiting apply method - Thread: {}", 
Thread.currentThread().getName());
           return null; // Return null if no aggregation is performed
       }`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to