This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new e47f9c65eb improve logging and code organization
e47f9c65eb is described below

commit e47f9c65eb7bc043a1cd6b093871279535d202d0
Author: yasithdev <[email protected]>
AuthorDate: Tue Jul 15 03:14:33 2025 -0500

    improve logging and code organization
---
 .../impl/task/parsing/ParsingTriggeringTask.java   | 24 +++++++++++++++-------
 .../helix/impl/workflow/PostWorkflowManager.java   | 22 +++++++++++---------
 .../airavata/monitor/kafka/MessageProducer.java    | 14 ++++++++-----
 .../airavata/monitor/realtime/RealtimeMonitor.java | 18 ++++++++--------
 .../file/server/FileServerConfiguration.java       |  5 +++--
 .../restproxy/controller/ProxyController.java      | 15 +++++++++-----
 6 files changed, 59 insertions(+), 39 deletions(-)

diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index f0d3ea37b6..4ab3d10b18 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -37,9 +37,18 @@ import org.slf4j.LoggerFactory;
 @TaskDef(name = "Parsing Triggering Task")
 public class ParsingTriggeringTask extends AiravataTask {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(DataParsingTask.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ParsingTriggeringTask.class);
 
     private static Producer<String, ProcessCompletionMessage> producer;
+    private final String topic;
+
+    public ParsingTriggeringTask() {
+        try {
+            topic = ServerSettings.getSetting("data.parser.topic");
+        } catch (ApplicationSettingsException e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     private void createProducer() throws ApplicationSettingsException {
 
@@ -49,22 +58,23 @@ public class ParsingTriggeringTask extends AiravataTask {
             props.put(ProducerConfig.CLIENT_ID_CONFIG, 
ServerSettings.getSetting("data.parser.broker.publisher.id"));
             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
             props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ProcessCompletionMessageSerializer.class.getName());
-            producer = new KafkaProducer<String, 
ProcessCompletionMessage>(props);
+            producer = new KafkaProducer<>(props);
         }
     }
 
     public void submitMessageToParserEngine(ProcessCompletionMessage 
completionMessage)
-            throws ExecutionException, InterruptedException, 
ApplicationSettingsException {
-        final ProducerRecord<String, ProcessCompletionMessage> record = new 
ProducerRecord<>(
-                ServerSettings.getSetting("data.parser.topic"), 
completionMessage.getExperimentId(), completionMessage);
-        RecordMetadata recordMetadata = producer.send(record).get();
+            throws ExecutionException, InterruptedException {
+        var experimentId = completionMessage.getExperimentId();
+        var record = new ProducerRecord<>(topic, experimentId, 
completionMessage);
+        producer.send(record).get();
+        logger.info("ParsingTriggeringTask posted to {}: {}->{}", topic, 
experimentId, completionMessage);
         producer.flush();
     }
 
     @Override
     public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
 
-        logger.info("Starting parsing triggerring task " + getTaskId() + ", 
experiment id " + getExperimentId());
+        logger.info("Starting parsing triggering task {}, experiment id {}", 
getTaskId(), getExperimentId());
 
         ProcessCompletionMessage completionMessage = new 
ProcessCompletionMessage();
         completionMessage.setExperimentId(getExperimentId());
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index ccaf671aed..a23d1b5edf 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -51,7 +51,6 @@ import org.apache.airavata.patform.monitoring.CountMonitor;
 import org.apache.airavata.patform.monitoring.MonitoringServer;
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -338,16 +337,19 @@ public class PostWorkflowManager extends WorkflowManager {
         new Thread(() -> {
                     while (true) {
                         final ConsumerRecords<String, JobStatusResult> 
consumerRecords = consumer.poll(Long.MAX_VALUE);
-                        CompletionService<Boolean> executorCompletionService =
-                                new 
ExecutorCompletionService<>(processingPool);
-                        List<Future<Boolean>> processingFutures = new 
ArrayList<>();
+                        var executorCompletionService = new 
ExecutorCompletionService<>(processingPool);
+                        var processingFutures = new ArrayList<>();
 
-                        for (TopicPartition partition : 
consumerRecords.partitions()) {
-                            List<ConsumerRecord<String, JobStatusResult>> 
partitionRecords =
-                                    consumerRecords.records(partition);
+                        for (var topicPartition : 
consumerRecords.partitions()) {
+                            var partitionRecords = 
consumerRecords.records(topicPartition);
                             logger.info("Received job records {}", 
partitionRecords.size());
 
-                            for (ConsumerRecord<String, JobStatusResult> 
record : partitionRecords) {
+                            for (var record : partitionRecords) {
+                                var topic = topicPartition.topic();
+                                var partition = topicPartition.partition();
+                                var key = record.key();
+                                var value = record.value();
+                                logger.info("received post on {}/{}: {}->{}", 
topic, partition, key, value);
                                 logger.info(
                                         "Submitting {} to process in thread 
pool",
                                         record.value().getJobId());
@@ -365,11 +367,11 @@ public class PostWorkflowManager extends WorkflowManager {
                                 }));
 
                                 consumer.commitSync(Collections.singletonMap(
-                                        partition, new 
OffsetAndMetadata(record.offset() + 1)));
+                                        topicPartition, new 
OffsetAndMetadata(record.offset() + 1)));
                             }
                         }
 
-                        for (Future<Boolean> f : processingFutures) {
+                        for (var f : processingFutures) {
                             try {
                                 executorCompletionService.take().get();
                             } catch (Exception e) {
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
index 0f6967a5ef..d3b2eb70ba 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
@@ -26,15 +26,18 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.monitor.JobStatusResult;
 import org.apache.kafka.clients.producer.*;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageProducer {
 
+    private static final Logger log = 
LoggerFactory.getLogger(MessageProducer.class);
     final Producer<String, JobStatusResult> producer;
-    final String jobMonitorQueue;
+    final String topic;
 
     public MessageProducer() throws ApplicationSettingsException {
         producer = createProducer();
-        jobMonitorQueue = 
ServerSettings.getSetting("job.monitor.broker.topic");
+        topic = ServerSettings.getSetting("job.monitor.broker.topic");
     }
 
     private Producer<String, JobStatusResult> createProducer() throws 
ApplicationSettingsException {
@@ -43,13 +46,14 @@ public class MessageProducer {
         props.put(ProducerConfig.CLIENT_ID_CONFIG, 
ServerSettings.getSetting("job.monitor.broker.publisher.id"));
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
JobStatusResultSerializer.class.getName());
-        return new KafkaProducer<String, JobStatusResult>(props);
+        return new KafkaProducer<>(props);
     }
 
     public void submitMessageToQueue(JobStatusResult jobStatusResult) throws 
ExecutionException, InterruptedException {
         var jobId = jobStatusResult.getJobId();
-        final var record = new ProducerRecord<>(jobMonitorQueue, jobId, 
jobStatusResult);
-        RecordMetadata recordMetadata = producer.send(record).get();
+        final var record = new ProducerRecord<>(topic, jobId, jobStatusResult);
+        producer.send(record).get();
+        log.info("MessageProducer posted to {}: {}->{}", topic, jobId, 
jobStatusResult);
         producer.flush();
     }
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 7efddab18f..dde0582d8f 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -19,6 +19,7 @@
 */
 package org.apache.airavata.monitor.realtime;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -67,25 +68,22 @@ public class RealtimeMonitor extends AbstractMonitor {
         final Consumer<String, String> consumer = createConsumer();
 
         while (true) {
-            final ConsumerRecords<String, String> consumerRecords = 
consumer.poll(1000);
+            final ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofSeconds(1));
+            RegistryService.Client registryClient = 
getRegistryClientPool().getResource();
             consumerRecords.forEach(record -> {
-                RegistryService.Client registryClient = 
getRegistryClientPool().getResource();
                 try {
-                    process(record.value(), registryClient);
-                    getRegistryClientPool().returnResource(registryClient);
+                    process(record.key(), record.value(), registryClient);
                 } catch (Exception e) {
-                    logger.error("Error while processing message " + 
record.value(), e);
-                    
getRegistryClientPool().returnBrokenResource(registryClient);
-                    // ignore this error
+                    logger.error("Error while processing message {}", 
record.value(), e);
                 }
             });
-
+            getRegistryClientPool().returnResource(registryClient);
             consumer.commitAsync();
         }
     }
 
-    private void process(String value, RegistryService.Client registryClient) 
throws MonitoringException {
-        logger.info("Received Job Status [{}]: {}", publisherId, value);
+    private void process(String key, String value, RegistryService.Client 
registryClient) throws MonitoringException {
+        logger.info("received post from {} on {}: {}->{}", publisherId, 
brokerTopic, key, value);
         JobStatusResult statusResult = parser.parse(value, publisherId, 
registryClient);
         if (statusResult != null) {
             logger.info("Submitting message to job monitor queue");
diff --git 
a/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
 
b/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
index f8f8a2db33..fd8f7664d1 100644
--- 
a/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
+++ 
b/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
@@ -19,6 +19,7 @@
 */
 package org.apache.airavata.file.server;
 
+import java.time.Duration;
 import org.apache.airavata.common.utils.ThriftClientPool;
 import org.apache.airavata.helix.core.support.adaptor.AdaptorSupportImpl;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
@@ -55,9 +56,9 @@ public class FileServerConfiguration {
         poolConfig.setTestOnBorrow(true);
         poolConfig.setTestWhileIdle(true);
         // must set timeBetweenEvictionRunsMillis since eviction doesn't run 
unless that is positive
-        poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
+        poolConfig.setTimeBetweenEvictionRuns(Duration.ofMinutes(5));
         poolConfig.setNumTestsPerEvictionRun(10);
-        poolConfig.setMaxWaitMillis(3000);
+        poolConfig.setMaxWait(Duration.ofSeconds(3));
 
         return new ThriftClientPool<>(RegistryService.Client::new, poolConfig, 
registryServerHost, registryServerPort);
     }
diff --git 
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
 
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
index 1680928188..a7c7d8fa65 100644
--- 
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
+++ 
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
@@ -23,18 +23,20 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import jakarta.annotation.PostConstruct;
 import java.util.Properties;
-import java.util.concurrent.Future;
 import org.apache.airavata.restproxy.RestProxyConfiguration;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
 @RestController
 public class ProxyController {
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyController.class);
+
     private KafkaProducer<String, String> producer;
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final RestProxyConfiguration restProxyConfiguration;
@@ -47,6 +49,7 @@ public class ProxyController {
     public void init() {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
restProxyConfiguration.getBrokerUrl());
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "RestProxyProducer");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producer = new KafkaProducer<>(props);
@@ -63,9 +66,11 @@ public class ProxyController {
                 JsonNode valueNode = record.get("value");
                 if (valueNode == null) continue;
                 String valueStr = objectMapper.writeValueAsString(valueNode);
-                ProducerRecord<String, String> kafkaRecord = new 
ProducerRecord<>(topic, null, valueStr);
-                Future<RecordMetadata> future = producer.send(kafkaRecord);
-                future.get();
+                log.info("Received message for topic {}: {}", topic, valueStr);
+                var kafkaRecord = new ProducerRecord<String, String>(topic, 
valueStr);
+                producer.send(kafkaRecord).get();
+                log.info("RestProxyProducer posted to topic {}: {}", topic, 
valueStr);
+                producer.flush();
             }
             return ResponseEntity.ok().build();
         } catch (Exception e) {

Reply via email to