This is an automated email from the ASF dual-hosted git repository.
yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new e47f9c65eb improve logging and code organization
e47f9c65eb is described below
commit e47f9c65eb7bc043a1cd6b093871279535d202d0
Author: yasithdev <[email protected]>
AuthorDate: Tue Jul 15 03:14:33 2025 -0500
improve logging and code organization
---
.../impl/task/parsing/ParsingTriggeringTask.java | 24 +++++++++++++++-------
.../helix/impl/workflow/PostWorkflowManager.java | 22 +++++++++++---------
.../airavata/monitor/kafka/MessageProducer.java | 14 ++++++++-----
.../airavata/monitor/realtime/RealtimeMonitor.java | 18 ++++++++--------
.../file/server/FileServerConfiguration.java | 5 +++--
.../restproxy/controller/ProxyController.java | 15 +++++++++-----
6 files changed, 59 insertions(+), 39 deletions(-)
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index f0d3ea37b6..4ab3d10b18 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -37,9 +37,18 @@ import org.slf4j.LoggerFactory;
@TaskDef(name = "Parsing Triggering Task")
public class ParsingTriggeringTask extends AiravataTask {
- private static final Logger logger =
LoggerFactory.getLogger(DataParsingTask.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ParsingTriggeringTask.class);
private static Producer<String, ProcessCompletionMessage> producer;
+ private final String topic;
+
+ public ParsingTriggeringTask() {
+ try {
+ topic = ServerSettings.getSetting("data.parser.topic");
+ } catch (ApplicationSettingsException e) {
+ throw new RuntimeException(e);
+ }
+ }
private void createProducer() throws ApplicationSettingsException {
@@ -49,22 +58,23 @@ public class ParsingTriggeringTask extends AiravataTask {
props.put(ProducerConfig.CLIENT_ID_CONFIG,
ServerSettings.getSetting("data.parser.broker.publisher.id"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ProcessCompletionMessageSerializer.class.getName());
- producer = new KafkaProducer<String,
ProcessCompletionMessage>(props);
+ producer = new KafkaProducer<>(props);
}
}
public void submitMessageToParserEngine(ProcessCompletionMessage
completionMessage)
- throws ExecutionException, InterruptedException,
ApplicationSettingsException {
- final ProducerRecord<String, ProcessCompletionMessage> record = new
ProducerRecord<>(
- ServerSettings.getSetting("data.parser.topic"),
completionMessage.getExperimentId(), completionMessage);
- RecordMetadata recordMetadata = producer.send(record).get();
+ throws ExecutionException, InterruptedException {
+ var experimentId = completionMessage.getExperimentId();
+ var record = new ProducerRecord<>(topic, experimentId,
completionMessage);
+ producer.send(record).get();
+ logger.info("ParsingTriggeringTask posted to {}: {}->{}", topic,
experimentId, completionMessage);
producer.flush();
}
@Override
public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
- logger.info("Starting parsing triggerring task " + getTaskId() + ",
experiment id " + getExperimentId());
+ logger.info("Starting parsing triggering task {}, experiment id {}",
getTaskId(), getExperimentId());
ProcessCompletionMessage completionMessage = new
ProcessCompletionMessage();
completionMessage.setExperimentId(getExperimentId());
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index ccaf671aed..a23d1b5edf 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -51,7 +51,6 @@ import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.airavata.patform.monitoring.MonitoringServer;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -338,16 +337,19 @@ public class PostWorkflowManager extends WorkflowManager {
new Thread(() -> {
while (true) {
final ConsumerRecords<String, JobStatusResult>
consumerRecords = consumer.poll(Long.MAX_VALUE);
- CompletionService<Boolean> executorCompletionService =
- new
ExecutorCompletionService<>(processingPool);
- List<Future<Boolean>> processingFutures = new
ArrayList<>();
+ var executorCompletionService = new
ExecutorCompletionService<>(processingPool);
+ var processingFutures = new ArrayList<>();
- for (TopicPartition partition :
consumerRecords.partitions()) {
- List<ConsumerRecord<String, JobStatusResult>>
partitionRecords =
- consumerRecords.records(partition);
+ for (var topicPartition :
consumerRecords.partitions()) {
+ var partitionRecords =
consumerRecords.records(topicPartition);
logger.info("Received job records {}",
partitionRecords.size());
- for (ConsumerRecord<String, JobStatusResult>
record : partitionRecords) {
+ for (var record : partitionRecords) {
+ var topic = topicPartition.topic();
+ var partition = topicPartition.partition();
+ var key = record.key();
+ var value = record.value();
+ logger.info("received post on {}/{}: {}->{}",
topic, partition, key, value);
logger.info(
"Submitting {} to process in thread
pool",
record.value().getJobId());
@@ -365,11 +367,11 @@ public class PostWorkflowManager extends WorkflowManager {
}));
consumer.commitSync(Collections.singletonMap(
- partition, new
OffsetAndMetadata(record.offset() + 1)));
+ topicPartition, new
OffsetAndMetadata(record.offset() + 1)));
}
}
- for (Future<Boolean> f : processingFutures) {
+ for (var f : processingFutures) {
try {
executorCompletionService.take().get();
} catch (Exception e) {
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
index 0f6967a5ef..d3b2eb70ba 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
@@ -26,15 +26,18 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.monitor.JobStatusResult;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageProducer {
+ private static final Logger log =
LoggerFactory.getLogger(MessageProducer.class);
final Producer<String, JobStatusResult> producer;
- final String jobMonitorQueue;
+ final String topic;
public MessageProducer() throws ApplicationSettingsException {
producer = createProducer();
- jobMonitorQueue =
ServerSettings.getSetting("job.monitor.broker.topic");
+ topic = ServerSettings.getSetting("job.monitor.broker.topic");
}
private Producer<String, JobStatusResult> createProducer() throws
ApplicationSettingsException {
@@ -43,13 +46,14 @@ public class MessageProducer {
props.put(ProducerConfig.CLIENT_ID_CONFIG,
ServerSettings.getSetting("job.monitor.broker.publisher.id"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JobStatusResultSerializer.class.getName());
- return new KafkaProducer<String, JobStatusResult>(props);
+ return new KafkaProducer<>(props);
}
public void submitMessageToQueue(JobStatusResult jobStatusResult) throws
ExecutionException, InterruptedException {
var jobId = jobStatusResult.getJobId();
- final var record = new ProducerRecord<>(jobMonitorQueue, jobId,
jobStatusResult);
- RecordMetadata recordMetadata = producer.send(record).get();
+ final var record = new ProducerRecord<>(topic, jobId, jobStatusResult);
+ producer.send(record).get();
+ log.info("MessageProducer posted to {}: {}->{}", topic, jobId,
jobStatusResult);
producer.flush();
}
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 7efddab18f..dde0582d8f 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.monitor.realtime;
+import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -67,25 +68,22 @@ public class RealtimeMonitor extends AbstractMonitor {
final Consumer<String, String> consumer = createConsumer();
while (true) {
- final ConsumerRecords<String, String> consumerRecords =
consumer.poll(1000);
+ final ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(1));
+ RegistryService.Client registryClient =
getRegistryClientPool().getResource();
consumerRecords.forEach(record -> {
- RegistryService.Client registryClient =
getRegistryClientPool().getResource();
try {
- process(record.value(), registryClient);
- getRegistryClientPool().returnResource(registryClient);
+ process(record.key(), record.value(), registryClient);
} catch (Exception e) {
- logger.error("Error while processing message " +
record.value(), e);
-
getRegistryClientPool().returnBrokenResource(registryClient);
- // ignore this error
+ logger.error("Error while processing message {}",
record.value(), e);
}
});
-
+ getRegistryClientPool().returnResource(registryClient);
consumer.commitAsync();
}
}
- private void process(String value, RegistryService.Client registryClient)
throws MonitoringException {
- logger.info("Received Job Status [{}]: {}", publisherId, value);
+ private void process(String key, String value, RegistryService.Client
registryClient) throws MonitoringException {
+ logger.info("received post from {} on {}: {}->{}", publisherId,
brokerTopic, key, value);
JobStatusResult statusResult = parser.parse(value, publisherId,
registryClient);
if (statusResult != null) {
logger.info("Submitting message to job monitor queue");
diff --git
a/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
b/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
index f8f8a2db33..fd8f7664d1 100644
---
a/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
+++
b/modules/file-server/src/main/java/org/apache/airavata/file/server/FileServerConfiguration.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.file.server;
+import java.time.Duration;
import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.helix.core.support.adaptor.AdaptorSupportImpl;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
@@ -55,9 +56,9 @@ public class FileServerConfiguration {
poolConfig.setTestOnBorrow(true);
poolConfig.setTestWhileIdle(true);
// must set timeBetweenEvictionRunsMillis since eviction doesn't run
unless that is positive
- poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
+ poolConfig.setTimeBetweenEvictionRuns(Duration.ofMinutes(5));
poolConfig.setNumTestsPerEvictionRun(10);
- poolConfig.setMaxWaitMillis(3000);
+ poolConfig.setMaxWait(Duration.ofSeconds(3));
return new ThriftClientPool<>(RegistryService.Client::new, poolConfig,
registryServerHost, registryServerPort);
}
diff --git
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
index 1680928188..a7c7d8fa65 100644
---
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
+++
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
@@ -23,18 +23,20 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import java.util.Properties;
-import java.util.concurrent.Future;
import org.apache.airavata.restproxy.RestProxyConfiguration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
public class ProxyController {
+ private static final Logger log =
LoggerFactory.getLogger(ProxyController.class);
+
private KafkaProducer<String, String> producer;
private final ObjectMapper objectMapper = new ObjectMapper();
private final RestProxyConfiguration restProxyConfiguration;
@@ -47,6 +49,7 @@ public class ProxyController {
public void init() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
restProxyConfiguration.getBrokerUrl());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "RestProxyProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
@@ -63,9 +66,11 @@ public class ProxyController {
JsonNode valueNode = record.get("value");
if (valueNode == null) continue;
String valueStr = objectMapper.writeValueAsString(valueNode);
- ProducerRecord<String, String> kafkaRecord = new
ProducerRecord<>(topic, null, valueStr);
- Future<RecordMetadata> future = producer.send(kafkaRecord);
- future.get();
+ log.info("Received message for topic {}: {}", topic, valueStr);
+ var kafkaRecord = new ProducerRecord<String, String>(topic,
valueStr);
+ producer.send(kafkaRecord).get();
+ log.info("RestProxyProducer posted to topic {}: {}", topic,
valueStr);
+ producer.flush();
}
return ResponseEntity.ok().build();
} catch (Exception e) {