This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new 620fce3d04 simplify repeated properties and organize some property 
loads.
620fce3d04 is described below

commit 620fce3d04d7020b7155d3bfdf1a3d854278a48c
Author: yasithdev <[email protected]>
AuthorDate: Mon Jul 14 20:37:47 2025 -0500

    simplify repeated properties and organize some property loads.
---
 .devcontainer/docker-compose-alt.yml               |  2 +-
 .../impl/task/parsing/ParsingTriggeringTask.java   |  6 +-
 .../helix/impl/workflow/ParserWorkflowManager.java |  8 +--
 .../helix/impl/workflow/PostWorkflowManager.java   |  4 +-
 .../airavata/monitor/email/EmailBasedMonitor.java  | 17 +++---
 .../airavata/monitor/kafka/MessageProducer.java    | 11 ++--
 .../airavata/monitor/realtime/RealtimeMonitor.java | 14 +++--
 .../realtime/parser/RealtimeJobStatusParser.java   | 68 ++++++++--------------
 .../src/main/resources/airavata-server.properties  | 14 ++---
 .../parser-wm/airavata-server.properties.j2        | 10 ++--
 .../participant/airavata-server.properties.j2      |  6 +-
 .../post-wm/airavata-server.properties.j2          |  4 +-
 .../email-monitor/airavata-server.properties.j2    |  2 +-
 .../realtime-monitor/airavata-server.properties.j2 |  5 +-
 .../restproxy/controller/ProxyController.java      |  5 +-
 15 files changed, 79 insertions(+), 97 deletions(-)

diff --git a/.devcontainer/docker-compose-alt.yml 
b/.devcontainer/docker-compose-alt.yml
index b105917d60..d1e1e919f2 100644
--- a/.devcontainer/docker-compose-alt.yml
+++ b/.devcontainer/docker-compose-alt.yml
@@ -218,7 +218,7 @@ services:
       - regserver.server.port=8970
       - email.based.monitor.address=CHANGEME
       - email.based.monitor.password=CHANGEME
-      - job.monitor.broker.url=kafka:9092
+      - kafka.broker.url=kafka:9092
     command: ["/tmp/wait-for-it.sh", "zookeeper:2181", "--", 
"/tmp/wait-for-it.sh", "apiserver:8970", "--" , "/tmp/wait-for-it.sh", 
"kafka:9092", "--", "/opt/apache-airavata-email-monitor/bin/email-monitor.sh"]
 
   db:
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index 53cc488304..6c88b07fa0 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -45,8 +45,8 @@ public class ParsingTriggeringTask extends AiravataTask {
 
         if (producer == null) {
             Properties props = new Properties();
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.parsing.broker.url"));
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, 
ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.broker.url"));
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, 
ServerSettings.getSetting("data.parser.broker.publisher.id"));
             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
             props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ProcessCompletionMessageSerializer.class.getName());
             producer = new KafkaProducer<String, 
ProcessCompletionMessage>(props);
@@ -56,7 +56,7 @@ public class ParsingTriggeringTask extends AiravataTask {
     public void submitMessageToParserEngine(ProcessCompletionMessage 
completionMessage)
             throws ExecutionException, InterruptedException, 
ApplicationSettingsException {
         final ProducerRecord<String, ProcessCompletionMessage> record = new 
ProducerRecord<>(
-                ServerSettings.getSetting("kafka.parser.topic"),
+                ServerSettings.getSetting("data.parser.topic"),
                 completionMessage.getExperimentId(),
                 completionMessage);
         RecordMetadata recordMetadata = producer.send(record).get();
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 688e82a037..e75f494210 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -56,7 +56,7 @@ public class ParserWorkflowManager extends WorkflowManager {
     private static final Logger logger = 
LoggerFactory.getLogger(ParserWorkflowManager.class);
     private static final CountMonitor parserwfCounter = new 
CountMonitor("parser_wf_counter");
 
-    private String parserStorageResourceId = 
ServerSettings.getSetting("parser.storage.resource.id");
+    private String parserStorageResourceId = 
ServerSettings.getSetting("data.parser.storage.resource.id");
 
     public ParserWorkflowManager() throws ApplicationSettingsException {
         super(
@@ -440,14 +440,14 @@ public class ParserWorkflowManager extends 
WorkflowManager {
 
         final Properties props = new Properties();
 
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.parsing.broker.url"));
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.broker.url"));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
ServerSettings.getSetting("data.parser.broker.consumer.group"));
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ProcessCompletionMessageDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         final Consumer<String, ProcessCompletionMessage> consumer = new 
KafkaConsumer<>(props);
 
-        
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.parser.topic")));
+        
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("data.parser.topic")));
 
         logger.info("Starting the kafka consumer..");
 
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 2dc6fb15a3..8fe307cbae 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -92,7 +92,7 @@ public class PostWorkflowManager extends WorkflowManager {
     private Consumer<String, JobStatusResult> createConsumer() throws 
ApplicationSettingsException {
         final Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.broker.url"));
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
ServerSettings.getSetting("kafka.broker.consumer.group"));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
ServerSettings.getSetting("job.monitor.broker.consumer.group"));
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
JobStatusResultDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -100,7 +100,7 @@ public class PostWorkflowManager extends WorkflowManager {
         // Create the consumer using props.
         final Consumer<String, JobStatusResult> consumer = new 
KafkaConsumer<>(props);
         // Subscribe to the topic.
-        
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.broker.topic")));
+        
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("job.monitor.broker.topic")));
         return consumer;
     }
 
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index b56ddb1918..7e36df285d 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -62,6 +62,7 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
     private Message[] flushUnseenMessages;
     private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new 
HashMap<>();
     private long emailExpirationTimeMinutes;
+    private String publisherId;
 
     public EmailBasedMonitor() throws Exception {
         init();
@@ -76,6 +77,7 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
         storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
         folderName = ServerSettings.getEmailBasedMonitorFolderName();
         emailExpirationTimeMinutes = 
Long.parseLong(ServerSettings.getSetting("email.expiration.minutes"));
+        publisherId = 
ServerSettings.getSetting("job.monitor.email.publisher.id");
         if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) {
             throw new AiravataException(
                     "Unsupported store protocol , expected " + IMAPS + " or " 
+ POP3 + " but found " + storeProtocol);
@@ -143,7 +145,7 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
         log.info("[EJM]: Added monitor Id : {} to email based monitor map", 
jobId);
     }
 
-    private JobStatusResult parse(Message message) throws MessagingException, 
AiravataException {
+    private JobStatusResult parse(Message message, String publisherId) throws 
MessagingException, AiravataException {
         Address fromAddress = message.getFrom()[0];
         String addressStr = fromAddress.toString();
         ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
@@ -156,7 +158,11 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
 
         try {
             JobStatusResult jobStatusResult = emailParser.parseEmail(message, 
regClient);
-            
jobStatusResult.setPublisherName(ServerSettings.getSetting("job.monitor.email.publisher.id"));
+            jobStatusResult.setPublisherName(publisherId);
+            var jobId = jobStatusResult.getJobId();
+            var jobName = jobStatusResult.getJobName();
+            var jobStatus = jobStatusResult.getState().getValue();
+            log.info("Parsed Job Status: From=[{}], Id={}, Name={}, State={}", 
publisherId, jobId, jobName, jobStatus);
             return jobStatusResult;
         } catch (Exception e) {
             getRegistryClientPool().returnBrokenResource(regClient);
@@ -252,11 +258,8 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
         List<Message> unreadMessages = new ArrayList<>();
         for (Message message : searchMessages) {
             try {
-                log.info("Parsing the job status message");
-                JobStatusResult jobStatusResult = parse(message);
-                log.info("Job message parsed. Job Id " + 
jobStatusResult.getJobId() + ", Job Name "
-                        + jobStatusResult.getJobName() + ", Job State "
-                        + jobStatusResult.getState().getValue());
+                log.info("Received Job Status [{}]: {}", publisherId, message);
+                JobStatusResult jobStatusResult = parse(message, publisherId);
                 submitJobStatus(jobStatusResult);
                 log.info("Submitted the job {} status to queue", 
jobStatusResult.getJobId());
                 processedMessages.add(message);
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
index 4320030a40..0f6967a5ef 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
@@ -30,24 +30,25 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 public class MessageProducer {
 
     final Producer<String, JobStatusResult> producer;
+    final String jobMonitorQueue;
 
     public MessageProducer() throws ApplicationSettingsException {
         producer = createProducer();
+        jobMonitorQueue = 
ServerSettings.getSetting("job.monitor.broker.topic");
     }
 
     private Producer<String, JobStatusResult> createProducer() throws 
ApplicationSettingsException {
         Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("job.monitor.broker.url"));
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.broker.url"));
         props.put(ProducerConfig.CLIENT_ID_CONFIG, 
ServerSettings.getSetting("job.monitor.broker.publisher.id"));
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
JobStatusResultSerializer.class.getName());
         return new KafkaProducer<String, JobStatusResult>(props);
     }
 
-    public void submitMessageToQueue(JobStatusResult jobStatusResult)
-            throws ExecutionException, InterruptedException, 
ApplicationSettingsException {
-        final ProducerRecord<String, JobStatusResult> record = new 
ProducerRecord<>(
-                ServerSettings.getSetting("job.monitor.broker.topic"), 
jobStatusResult.getJobId(), jobStatusResult);
+    public void submitMessageToQueue(JobStatusResult jobStatusResult) throws 
ExecutionException, InterruptedException {
+        var jobId = jobStatusResult.getJobId();
+        final var record = new ProducerRecord<>(jobMonitorQueue, jobId, 
jobStatusResult);
         RecordMetadata recordMetadata = producer.send(record).get();
         producer.flush();
     }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 16378a4459..7efddab18f 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -40,22 +40,26 @@ public class RealtimeMonitor extends AbstractMonitor {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RealtimeMonitor.class);
 
-    private RealtimeJobStatusParser parser;
+    private final RealtimeJobStatusParser parser;
+    private final String publisherId;
+    private final String brokerTopic;
 
     public RealtimeMonitor() throws ApplicationSettingsException {
         parser = new RealtimeJobStatusParser();
+        publisherId = 
ServerSettings.getSetting("job.monitor.realtime.publisher.id");
+        brokerTopic = 
ServerSettings.getSetting("realtime.monitor.broker.topic");
     }
 
     private Consumer<String, String> createConsumer() throws 
ApplicationSettingsException {
         final Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("realtime.monitor.broker.url"));
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
ServerSettings.getSetting("kafka.broker.url"));
         props.put(ConsumerConfig.GROUP_ID_CONFIG, 
ServerSettings.getSetting("realtime.monitor.broker.consumer.group"));
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         // Create the consumer using props.
         final Consumer<String, String> consumer = new KafkaConsumer<>(props);
         // Subscribe to the topic.
-        
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("realtime.monitor.broker.topic")));
+        consumer.subscribe(Collections.singletonList(brokerTopic));
         return consumer;
     }
 
@@ -81,8 +85,8 @@ public class RealtimeMonitor extends AbstractMonitor {
     }
 
     private void process(String value, RegistryService.Client registryClient) 
throws MonitoringException {
-        logger.info("Received data " + value);
-        JobStatusResult statusResult = parser.parse(value, registryClient);
+        logger.info("Received Job Status [{}]: {}", publisherId, value);
+        JobStatusResult statusResult = parser.parse(value, publisherId, 
registryClient);
         if (statusResult != null) {
             logger.info("Submitting message to job monitor queue");
             submitJobStatus(statusResult);
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
index 6cb72d0913..4a109f065c 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
@@ -24,7 +24,6 @@ import com.google.gson.JsonSyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.monitor.JobStatusResult;
@@ -41,9 +40,9 @@ public class RealtimeJobStatusParser {
         for (int i = 0; i < 3; i++) {
 
             List<JobModel> jobsOfTask = registryClient.getJobs("taskId", 
taskId);
-            if (jobsOfTask == null || jobsOfTask.size() == 0) {
+            if (jobsOfTask == null || jobsOfTask.isEmpty()) {
                 // Retry after 2s
-                logger.warn("No jobs for task " + taskId + ". Retrying in 2 
seconds");
+                logger.warn("No jobs for task {}. Retrying in 2 seconds", 
taskId);
                 Thread.sleep(2000);
             } else {
                 Optional<JobModel> filtered = jobsOfTask.stream()
@@ -52,7 +51,7 @@ public class RealtimeJobStatusParser {
                 if (filtered.isPresent()) {
                     return filtered.get().getJobId();
                 } else {
-                    logger.warn("No job for job name " + jobName + " and task 
" + taskId + ". Retrying in 2 seconds");
+                    logger.warn("No job for job name {} and task {}. Retrying 
in 2 seconds", jobName, taskId);
                     Thread.sleep(2000);
                 }
             }
@@ -60,7 +59,7 @@ public class RealtimeJobStatusParser {
         return null;
     }
 
-    public JobStatusResult parse(String rawMessage, RegistryService.Client 
registryClient) {
+    public JobStatusResult parse(String rawMessage, String publisherId, 
RegistryService.Client registryClient) {
 
         try {
             Map asMap = new Gson().fromJson(rawMessage, Map.class);
@@ -74,44 +73,26 @@ public class RealtimeJobStatusParser {
                     try {
                         String jobId = getJobIdIdByJobNameWithRetry(jobName, 
taskId, registryClient);
                         if (jobId == null) {
-                            logger.error("No job id for job name " + jobName);
+                            logger.error("No job id for job name {}", jobName);
                             return null;
                         }
 
-                        JobState jobState = null;
-
-                        switch (status) {
-                            case "RUNNING":
-                                jobState = JobState.ACTIVE;
-                                break;
-                            case "COMPLETED":
-                                jobState = JobState.COMPLETE;
-                                break;
-                            case "FAILED":
-                                jobState = JobState.FAILED;
-                                break;
-                            case "SUBMITTED":
-                                jobState = JobState.SUBMITTED;
-                                break;
-                            case "QUEUED":
-                                jobState = JobState.QUEUED;
-                                break;
-                            case "CANCELED":
-                                jobState = JobState.CANCELED;
-                                break;
-                            case "SUSPENDED":
-                                jobState = JobState.SUSPENDED;
-                                break;
-                            case "UNKNOWN":
-                                jobState = JobState.UNKNOWN;
-                                break;
-                            case "NON_CRITICAL_FAIL":
-                                jobState = JobState.NON_CRITICAL_FAIL;
-                                break;
-                        }
+                        JobState jobState =
+                                switch (status) {
+                                    case "RUNNING" -> JobState.ACTIVE;
+                                    case "COMPLETED" -> JobState.COMPLETE;
+                                    case "FAILED" -> JobState.FAILED;
+                                    case "SUBMITTED" -> JobState.SUBMITTED;
+                                    case "QUEUED" -> JobState.QUEUED;
+                                    case "CANCELED" -> JobState.CANCELED;
+                                    case "SUSPENDED" -> JobState.SUSPENDED;
+                                    case "UNKNOWN" -> JobState.UNKNOWN;
+                                    case "NON_CRITICAL_FAIL" -> 
JobState.NON_CRITICAL_FAIL;
+                                    default -> null;
+                                };
 
                         if (jobState == null) {
-                            logger.error("Invalid job state " + status);
+                            logger.error("Invalid job state {}", status);
                             return null;
                         }
 
@@ -119,23 +100,22 @@ public class RealtimeJobStatusParser {
                         jobStatusResult.setJobId(jobId);
                         jobStatusResult.setJobName(jobName);
                         jobStatusResult.setState(jobState);
-                        jobStatusResult.setPublisherName(
-                                
ServerSettings.getSetting("job.monitor.realtime.publisher.id"));
+                        jobStatusResult.setPublisherName(publisherId);
                         return jobStatusResult;
                     } catch (Exception e) {
-                        logger.error("Failed to fetch job id for job name " + 
jobName);
+                        logger.error("Failed to fetch job id for job name {}", 
jobName);
                         return null;
                     }
                 } else {
-                    logger.error("Job name, taskId or status is null in 
message " + rawMessage);
+                    logger.error("Job name, taskId or status is null in 
message {}", rawMessage);
                     return null;
                 }
             } else {
-                logger.error("Data structure of message " + rawMessage + " is 
not correct");
+                logger.error("Data structure of message {} is not correct", 
rawMessage);
                 return null;
             }
         } catch (JsonSyntaxException e) {
-            logger.error("Failed to parse raw data " + rawMessage + " to type 
Map", e);
+            logger.error("Failed to parse raw data {} to type Map", 
rawMessage, e);
             return null;
         }
     }
diff --git a/airavata-api/src/main/resources/airavata-server.properties 
b/airavata-api/src/main/resources/airavata-server.properties
index abe4425f36..7da7ff8ebf 100644
--- a/airavata-api/src/main/resources/airavata-server.properties
+++ b/airavata-api/src/main/resources/airavata-server.properties
@@ -87,18 +87,17 @@ job.monitor.broker.publisher.id=AiravataMonitorPublisher
 job.monitor.email.publisher.id=EmailBasedProducer
 job.monitor.realtime.publisher.id=RealtimeProducer
 job.monitor.broker.topic=monitoring-data
-job.monitor.broker.url=airavata.host:9092
+job.monitor.broker.consumer.group=MonitoringConsumer
+
 job.notification.emailids=
 job.notification.enable=true
 job.status.publish.endpoint=http://airavata.host:8082/topics/helix-airavata-mq
 
job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
 
 kafka.broker.url=airavata.host:9092
-kafka.broker.consumer.group=MonitoringConsumer
-kafka.broker.topic=monitoring-data
-kafka.parsing.broker.url=airavata.host:9092
-kafka.parser.broker.consumer.group=ParsingConsumer
-kafka.parser.topic=parsing-data
+
+data.parser.broker.consumer.group=ParsingConsumer
+data.parser.topic=parsing-data
 
 local.data.location=/tmp
 
@@ -109,7 +108,7 @@ orchestrator.server.min.threads=50
 orchestrator.server.port=8940
 orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
 
-parser.storage.resource.id=CHANGE_ME
+data.parser.storage.resource.id=CHANGE_ME
 
 participant.monitoring.enabled=true
 participant.monitoring.host=airavata.host
@@ -146,7 +145,6 @@ prefetch.count=200
 
 realtime.monitor.broker.consumer.group=monitor
 realtime.monitor.broker.topic=helix-airavata-mq
-realtime.monitor.broker.url=airavata.host:9092
 
 registry.jdbc.driver=org.mariadb.jdbc.Driver
 registry.jdbc.password=123456
diff --git 
a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
index c097120d0d..649816abce 100644
--- 
a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
@@ -51,11 +51,11 @@ zookeeper.timeout=30000
 ###########################################################################
 # Data Parser Configurations
 ###########################################################################
-kafka.parsing.broker.url={{ parser_broker_url }}
-kafka.parser.broker.consumer.group={{ parser_broker_consumer_group }}
-kafka.parser.topic={{ parser_broker_topic }}
-parser.storage.resource.id={{ parser_storage_resource_id }}
-kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }}
+kafka.broker.url={{ parser_broker_url }}
+data.parser.broker.consumer.group={{ parser_broker_consumer_group }}
+data.parser.topic={{ parser_broker_topic }}
+data.parser.broker.publisher.id={{ parser_broker_publisher_id }}
+data.parser.storage.resource.id={{ parser_storage_resource_id }}
 post.workflow.manager.loadbalance.clusters=False
 
 ###########################################################################
diff --git 
a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
index 63fd7be85d..d652a69712 100644
--- 
a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
@@ -72,9 +72,9 @@ zookeeper.timeout=30000
 ###########################################################################
 # Data Parser Configurations
 ###########################################################################
-kafka.parsing.broker.url={{ parser_broker_url }}
-kafka.parser.topic={{ parser_broker_topic }}
-kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }}
+kafka.broker.url={{ parser_broker_url }}
+data.parser.topic={{ parser_broker_topic }}
+data.parser.broker.publisher.id={{ parser_broker_publisher_id }}
 
 ###########################################################################
 # Job Submission Task Level Configurations
diff --git 
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
index 06833e03e8..43aa33f2d2 100644
--- 
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
@@ -26,8 +26,8 @@ regserver.server.port={{ registry_port }}
 # Helix workflow manager configurations
 ###########################################################################
 kafka.broker.url={{ job_monitor_broker_url }}
-kafka.broker.topic={{ job_monitor_broker_topic }}
-kafka.broker.consumer.group={{ job_monitor_broker_consumer_group }}
+job.monitor.broker.topic={{ job_monitor_broker_topic }}
+job.monitor.broker.consumer.group={{ job_monitor_broker_consumer_group }}
 helix.cluster.name={{ helix_cluster_name }}
 post.workflow.manager.name={{ helix_post_wm_name }}
 post.workflow.manager.loadbalance.clusters={{ 
helix_post_wm_load_balance_clusters }}
diff --git 
a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
index 4bc1b99bd2..cfda787ec0 100644
--- 
a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
@@ -27,7 +27,7 @@ email.based.monitor.store.protocol=imaps
 email.based.monitoring.period=10000
 
 #These properties will be used to published parsed email messages to job 
monitor queue
-job.monitor.broker.url={{ job_monitor_broker_url }}
+kafka.broker.url={{ job_monitor_broker_url }}
 job.monitor.broker.topic={{ job_monitor_broker_topic }}
 job.monitor.email.publisher.id={{ email_job_monitor_broker_publisher }}
 
diff --git 
a/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
index 1324453969..d1d14f98c2 100644
--- 
a/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
@@ -19,11 +19,10 @@
 zookeeper.server.connection={{ zookeeper_connection_url }}
 zookeeper.timeout=30000
 
-realtime.monitor.broker.url={{ realtime_monitor_broker_url }}
-realtime.monitor.broker.consumer.group={{ 
realtime_monitor_broker_consumer_group }}
+kafka.broker.url={{ job_monitor_broker_url }}
+realtime.monitor.broker.consumer.group={{realtime_monitor_broker_consumer_group
 }}
 realtime.monitor.broker.topic={{ realtime_monitor_broker_topic }}
 
-job.monitor.broker.url={{ job_monitor_broker_url }}
 job.monitor.broker.topic={{ job_monitor_broker_topic }}
 job.monitor.realtime.publisher.id={{ realtime_monitor_broker_publisher }}
 
diff --git 
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
 
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
index 217a82d8cd..1680928188 100644
--- 
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
+++ 
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
@@ -52,10 +52,7 @@ public class ProxyController {
         producer = new KafkaProducer<>(props);
     }
 
-    @PostMapping(
-            value = "/topics/{topic}",
-            consumes = "application/vnd.kafka.json.v2+json",
-            produces = "application/vnd.kafka.v2+json")
+    @PostMapping(value = "/topics/{topic}", consumes = 
"application/vnd.kafka.json.v2+json")
     public ResponseEntity<?> postToKafka(@PathVariable("topic") String topic, 
@RequestBody String body) {
         try {
             JsonNode root = objectMapper.readTree(body);

Reply via email to