This is an automated email from the ASF dual-hosted git repository.
yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 620fce3d04 simplify repeated properties and organize some property
loads.
620fce3d04 is described below
commit 620fce3d04d7020b7155d3bfdf1a3d854278a48c
Author: yasithdev <[email protected]>
AuthorDate: Mon Jul 14 20:37:47 2025 -0500
simplify repeated properties and organize some property loads.
---
.devcontainer/docker-compose-alt.yml | 2 +-
.../impl/task/parsing/ParsingTriggeringTask.java | 6 +-
.../helix/impl/workflow/ParserWorkflowManager.java | 8 +--
.../helix/impl/workflow/PostWorkflowManager.java | 4 +-
.../airavata/monitor/email/EmailBasedMonitor.java | 17 +++---
.../airavata/monitor/kafka/MessageProducer.java | 11 ++--
.../airavata/monitor/realtime/RealtimeMonitor.java | 14 +++--
.../realtime/parser/RealtimeJobStatusParser.java | 68 ++++++++--------------
.../src/main/resources/airavata-server.properties | 14 ++---
.../parser-wm/airavata-server.properties.j2 | 10 ++--
.../participant/airavata-server.properties.j2 | 6 +-
.../post-wm/airavata-server.properties.j2 | 4 +-
.../email-monitor/airavata-server.properties.j2 | 2 +-
.../realtime-monitor/airavata-server.properties.j2 | 5 +-
.../restproxy/controller/ProxyController.java | 5 +-
15 files changed, 79 insertions(+), 97 deletions(-)
diff --git a/.devcontainer/docker-compose-alt.yml
b/.devcontainer/docker-compose-alt.yml
index b105917d60..d1e1e919f2 100644
--- a/.devcontainer/docker-compose-alt.yml
+++ b/.devcontainer/docker-compose-alt.yml
@@ -218,7 +218,7 @@ services:
- regserver.server.port=8970
- email.based.monitor.address=CHANGEME
- email.based.monitor.password=CHANGEME
- - job.monitor.broker.url=kafka:9092
+ - kafka.broker.url=kafka:9092
command: ["/tmp/wait-for-it.sh", "zookeeper:2181", "--",
"/tmp/wait-for-it.sh", "apiserver:8970", "--" , "/tmp/wait-for-it.sh",
"kafka:9092", "--", "/opt/apache-airavata-email-monitor/bin/email-monitor.sh"]
db:
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index 53cc488304..6c88b07fa0 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -45,8 +45,8 @@ public class ParsingTriggeringTask extends AiravataTask {
if (producer == null) {
Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.parsing.broker.url"));
- props.put(ProducerConfig.CLIENT_ID_CONFIG,
ServerSettings.getSetting("kafka.parsing.broker.publisher.id"));
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.broker.url"));
+ props.put(ProducerConfig.CLIENT_ID_CONFIG,
ServerSettings.getSetting("data.parser.broker.publisher.id"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ProcessCompletionMessageSerializer.class.getName());
producer = new KafkaProducer<String,
ProcessCompletionMessage>(props);
@@ -56,7 +56,7 @@ public class ParsingTriggeringTask extends AiravataTask {
public void submitMessageToParserEngine(ProcessCompletionMessage
completionMessage)
throws ExecutionException, InterruptedException,
ApplicationSettingsException {
final ProducerRecord<String, ProcessCompletionMessage> record = new
ProducerRecord<>(
- ServerSettings.getSetting("kafka.parser.topic"),
+ ServerSettings.getSetting("data.parser.topic"),
completionMessage.getExperimentId(),
completionMessage);
RecordMetadata recordMetadata = producer.send(record).get();
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 688e82a037..e75f494210 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -56,7 +56,7 @@ public class ParserWorkflowManager extends WorkflowManager {
private static final Logger logger =
LoggerFactory.getLogger(ParserWorkflowManager.class);
private static final CountMonitor parserwfCounter = new
CountMonitor("parser_wf_counter");
- private String parserStorageResourceId =
ServerSettings.getSetting("parser.storage.resource.id");
+ private String parserStorageResourceId =
ServerSettings.getSetting("data.parser.storage.resource.id");
public ParserWorkflowManager() throws ApplicationSettingsException {
super(
@@ -440,14 +440,14 @@ public class ParserWorkflowManager extends
WorkflowManager {
final Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.parsing.broker.url"));
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
ServerSettings.getSetting("kafka.parser.broker.consumer.group"));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.broker.url"));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
ServerSettings.getSetting("data.parser.broker.consumer.group"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ProcessCompletionMessageDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
final Consumer<String, ProcessCompletionMessage> consumer = new
KafkaConsumer<>(props);
-
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.parser.topic")));
+
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("data.parser.topic")));
logger.info("Starting the kafka consumer..");
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 2dc6fb15a3..8fe307cbae 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -92,7 +92,7 @@ public class PostWorkflowManager extends WorkflowManager {
private Consumer<String, JobStatusResult> createConsumer() throws
ApplicationSettingsException {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.broker.url"));
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
ServerSettings.getSetting("kafka.broker.consumer.group"));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
ServerSettings.getSetting("job.monitor.broker.consumer.group"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JobStatusResultDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -100,7 +100,7 @@ public class PostWorkflowManager extends WorkflowManager {
// Create the consumer using props.
final Consumer<String, JobStatusResult> consumer = new
KafkaConsumer<>(props);
// Subscribe to the topic.
-
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.broker.topic")));
+
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("job.monitor.broker.topic")));
return consumer;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index b56ddb1918..7e36df285d 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -62,6 +62,7 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
private Message[] flushUnseenMessages;
private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new
HashMap<>();
private long emailExpirationTimeMinutes;
+ private String publisherId;
public EmailBasedMonitor() throws Exception {
init();
@@ -76,6 +77,7 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
folderName = ServerSettings.getEmailBasedMonitorFolderName();
emailExpirationTimeMinutes =
Long.parseLong(ServerSettings.getSetting("email.expiration.minutes"));
+ publisherId =
ServerSettings.getSetting("job.monitor.email.publisher.id");
if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) {
throw new AiravataException(
"Unsupported store protocol , expected " + IMAPS + " or "
+ POP3 + " but found " + storeProtocol);
@@ -143,7 +145,7 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
log.info("[EJM]: Added monitor Id : {} to email based monitor map",
jobId);
}
- private JobStatusResult parse(Message message) throws MessagingException,
AiravataException {
+ private JobStatusResult parse(Message message, String publisherId) throws
MessagingException, AiravataException {
Address fromAddress = message.getFrom()[0];
String addressStr = fromAddress.toString();
ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
@@ -156,7 +158,11 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
try {
JobStatusResult jobStatusResult = emailParser.parseEmail(message,
regClient);
-
jobStatusResult.setPublisherName(ServerSettings.getSetting("job.monitor.email.publisher.id"));
+ jobStatusResult.setPublisherName(publisherId);
+ var jobId = jobStatusResult.getJobId();
+ var jobName = jobStatusResult.getJobName();
+ var jobStatus = jobStatusResult.getState().getValue();
+ log.info("Parsed Job Status: From=[{}], Id={}, Name={}, State={}",
publisherId, jobId, jobName, jobStatus);
return jobStatusResult;
} catch (Exception e) {
getRegistryClientPool().returnBrokenResource(regClient);
@@ -252,11 +258,8 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
List<Message> unreadMessages = new ArrayList<>();
for (Message message : searchMessages) {
try {
- log.info("Parsing the job status message");
- JobStatusResult jobStatusResult = parse(message);
- log.info("Job message parsed. Job Id " +
jobStatusResult.getJobId() + ", Job Name "
- + jobStatusResult.getJobName() + ", Job State "
- + jobStatusResult.getState().getValue());
+ log.info("Received Job Status [{}]: {}", publisherId, message);
+ JobStatusResult jobStatusResult = parse(message, publisherId);
submitJobStatus(jobStatusResult);
log.info("Submitted the job {} status to queue",
jobStatusResult.getJobId());
processedMessages.add(message);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
index 4320030a40..0f6967a5ef 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
@@ -30,24 +30,25 @@ import
org.apache.kafka.common.serialization.StringSerializer;
public class MessageProducer {
final Producer<String, JobStatusResult> producer;
+ final String jobMonitorQueue;
public MessageProducer() throws ApplicationSettingsException {
producer = createProducer();
+ jobMonitorQueue =
ServerSettings.getSetting("job.monitor.broker.topic");
}
private Producer<String, JobStatusResult> createProducer() throws
ApplicationSettingsException {
Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("job.monitor.broker.url"));
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.broker.url"));
props.put(ProducerConfig.CLIENT_ID_CONFIG,
ServerSettings.getSetting("job.monitor.broker.publisher.id"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JobStatusResultSerializer.class.getName());
return new KafkaProducer<String, JobStatusResult>(props);
}
- public void submitMessageToQueue(JobStatusResult jobStatusResult)
- throws ExecutionException, InterruptedException,
ApplicationSettingsException {
- final ProducerRecord<String, JobStatusResult> record = new
ProducerRecord<>(
- ServerSettings.getSetting("job.monitor.broker.topic"),
jobStatusResult.getJobId(), jobStatusResult);
+ public void submitMessageToQueue(JobStatusResult jobStatusResult) throws
ExecutionException, InterruptedException {
+ var jobId = jobStatusResult.getJobId();
+ final var record = new ProducerRecord<>(jobMonitorQueue, jobId,
jobStatusResult);
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 16378a4459..7efddab18f 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -40,22 +40,26 @@ public class RealtimeMonitor extends AbstractMonitor {
private static final Logger logger =
LoggerFactory.getLogger(RealtimeMonitor.class);
- private RealtimeJobStatusParser parser;
+ private final RealtimeJobStatusParser parser;
+ private final String publisherId;
+ private final String brokerTopic;
public RealtimeMonitor() throws ApplicationSettingsException {
parser = new RealtimeJobStatusParser();
+ publisherId =
ServerSettings.getSetting("job.monitor.realtime.publisher.id");
+ brokerTopic =
ServerSettings.getSetting("realtime.monitor.broker.topic");
}
private Consumer<String, String> createConsumer() throws
ApplicationSettingsException {
final Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("realtime.monitor.broker.url"));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ServerSettings.getSetting("kafka.broker.url"));
props.put(ConsumerConfig.GROUP_ID_CONFIG,
ServerSettings.getSetting("realtime.monitor.broker.consumer.group"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
-
consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("realtime.monitor.broker.topic")));
+ consumer.subscribe(Collections.singletonList(brokerTopic));
return consumer;
}
@@ -81,8 +85,8 @@ public class RealtimeMonitor extends AbstractMonitor {
}
private void process(String value, RegistryService.Client registryClient)
throws MonitoringException {
- logger.info("Received data " + value);
- JobStatusResult statusResult = parser.parse(value, registryClient);
+ logger.info("Received Job Status [{}]: {}", publisherId, value);
+ JobStatusResult statusResult = parser.parse(value, publisherId,
registryClient);
if (statusResult != null) {
logger.info("Submitting message to job monitor queue");
submitJobStatus(statusResult);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
index 6cb72d0913..4a109f065c 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java
@@ -24,7 +24,6 @@ import com.google.gson.JsonSyntaxException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.monitor.JobStatusResult;
@@ -41,9 +40,9 @@ public class RealtimeJobStatusParser {
for (int i = 0; i < 3; i++) {
List<JobModel> jobsOfTask = registryClient.getJobs("taskId",
taskId);
- if (jobsOfTask == null || jobsOfTask.size() == 0) {
+ if (jobsOfTask == null || jobsOfTask.isEmpty()) {
// Retry after 2s
- logger.warn("No jobs for task " + taskId + ". Retrying in 2
seconds");
+ logger.warn("No jobs for task {}. Retrying in 2 seconds",
taskId);
Thread.sleep(2000);
} else {
Optional<JobModel> filtered = jobsOfTask.stream()
@@ -52,7 +51,7 @@ public class RealtimeJobStatusParser {
if (filtered.isPresent()) {
return filtered.get().getJobId();
} else {
- logger.warn("No job for job name " + jobName + " and task
" + taskId + ". Retrying in 2 seconds");
+ logger.warn("No job for job name {} and task {}. Retrying
in 2 seconds", jobName, taskId);
Thread.sleep(2000);
}
}
@@ -60,7 +59,7 @@ public class RealtimeJobStatusParser {
return null;
}
- public JobStatusResult parse(String rawMessage, RegistryService.Client
registryClient) {
+ public JobStatusResult parse(String rawMessage, String publisherId,
RegistryService.Client registryClient) {
try {
Map asMap = new Gson().fromJson(rawMessage, Map.class);
@@ -74,44 +73,26 @@ public class RealtimeJobStatusParser {
try {
String jobId = getJobIdIdByJobNameWithRetry(jobName,
taskId, registryClient);
if (jobId == null) {
- logger.error("No job id for job name " + jobName);
+ logger.error("No job id for job name {}", jobName);
return null;
}
- JobState jobState = null;
-
- switch (status) {
- case "RUNNING":
- jobState = JobState.ACTIVE;
- break;
- case "COMPLETED":
- jobState = JobState.COMPLETE;
- break;
- case "FAILED":
- jobState = JobState.FAILED;
- break;
- case "SUBMITTED":
- jobState = JobState.SUBMITTED;
- break;
- case "QUEUED":
- jobState = JobState.QUEUED;
- break;
- case "CANCELED":
- jobState = JobState.CANCELED;
- break;
- case "SUSPENDED":
- jobState = JobState.SUSPENDED;
- break;
- case "UNKNOWN":
- jobState = JobState.UNKNOWN;
- break;
- case "NON_CRITICAL_FAIL":
- jobState = JobState.NON_CRITICAL_FAIL;
- break;
- }
+ JobState jobState =
+ switch (status) {
+ case "RUNNING" -> JobState.ACTIVE;
+ case "COMPLETED" -> JobState.COMPLETE;
+ case "FAILED" -> JobState.FAILED;
+ case "SUBMITTED" -> JobState.SUBMITTED;
+ case "QUEUED" -> JobState.QUEUED;
+ case "CANCELED" -> JobState.CANCELED;
+ case "SUSPENDED" -> JobState.SUSPENDED;
+ case "UNKNOWN" -> JobState.UNKNOWN;
+ case "NON_CRITICAL_FAIL" ->
JobState.NON_CRITICAL_FAIL;
+ default -> null;
+ };
if (jobState == null) {
- logger.error("Invalid job state " + status);
+ logger.error("Invalid job state {}", status);
return null;
}
@@ -119,23 +100,22 @@ public class RealtimeJobStatusParser {
jobStatusResult.setJobId(jobId);
jobStatusResult.setJobName(jobName);
jobStatusResult.setState(jobState);
- jobStatusResult.setPublisherName(
-
ServerSettings.getSetting("job.monitor.realtime.publisher.id"));
+ jobStatusResult.setPublisherName(publisherId);
return jobStatusResult;
} catch (Exception e) {
- logger.error("Failed to fetch job id for job name " +
jobName);
+ logger.error("Failed to fetch job id for job name {}",
jobName);
return null;
}
} else {
- logger.error("Job name, taskId or status is null in
message " + rawMessage);
+ logger.error("Job name, taskId or status is null in
message {}", rawMessage);
return null;
}
} else {
- logger.error("Data structure of message " + rawMessage + " is
not correct");
+ logger.error("Data structure of message {} is not correct",
rawMessage);
return null;
}
} catch (JsonSyntaxException e) {
- logger.error("Failed to parse raw data " + rawMessage + " to type
Map", e);
+ logger.error("Failed to parse raw data {} to type Map",
rawMessage, e);
return null;
}
}
diff --git a/airavata-api/src/main/resources/airavata-server.properties
b/airavata-api/src/main/resources/airavata-server.properties
index abe4425f36..7da7ff8ebf 100644
--- a/airavata-api/src/main/resources/airavata-server.properties
+++ b/airavata-api/src/main/resources/airavata-server.properties
@@ -87,18 +87,17 @@ job.monitor.broker.publisher.id=AiravataMonitorPublisher
job.monitor.email.publisher.id=EmailBasedProducer
job.monitor.realtime.publisher.id=RealtimeProducer
job.monitor.broker.topic=monitoring-data
-job.monitor.broker.url=airavata.host:9092
+job.monitor.broker.consumer.group=MonitoringConsumer
+
job.notification.emailids=
job.notification.enable=true
job.status.publish.endpoint=http://airavata.host:8082/topics/helix-airavata-mq
job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
kafka.broker.url=airavata.host:9092
-kafka.broker.consumer.group=MonitoringConsumer
-kafka.broker.topic=monitoring-data
-kafka.parsing.broker.url=airavata.host:9092
-kafka.parser.broker.consumer.group=ParsingConsumer
-kafka.parser.topic=parsing-data
+
+data.parser.broker.consumer.group=ParsingConsumer
+data.parser.topic=parsing-data
local.data.location=/tmp
@@ -109,7 +108,7 @@ orchestrator.server.min.threads=50
orchestrator.server.port=8940
orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
-parser.storage.resource.id=CHANGE_ME
+data.parser.storage.resource.id=CHANGE_ME
participant.monitoring.enabled=true
participant.monitoring.host=airavata.host
@@ -146,7 +145,6 @@ prefetch.count=200
realtime.monitor.broker.consumer.group=monitor
realtime.monitor.broker.topic=helix-airavata-mq
-realtime.monitor.broker.url=airavata.host:9092
registry.jdbc.driver=org.mariadb.jdbc.Driver
registry.jdbc.password=123456
diff --git
a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
index c097120d0d..649816abce 100644
---
a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
@@ -51,11 +51,11 @@ zookeeper.timeout=30000
###########################################################################
# Data Parser Configurations
###########################################################################
-kafka.parsing.broker.url={{ parser_broker_url }}
-kafka.parser.broker.consumer.group={{ parser_broker_consumer_group }}
-kafka.parser.topic={{ parser_broker_topic }}
-parser.storage.resource.id={{ parser_storage_resource_id }}
-kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }}
+kafka.broker.url={{ parser_broker_url }}
+data.parser.broker.consumer.group={{ parser_broker_consumer_group }}
+data.parser.topic={{ parser_broker_topic }}
+data.parser.broker.publisher.id={{ parser_broker_publisher_id }}
+data.parser.storage.resource.id={{ parser_storage_resource_id }}
post.workflow.manager.loadbalance.clusters=False
###########################################################################
diff --git
a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
index 63fd7be85d..d652a69712 100644
---
a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
@@ -72,9 +72,9 @@ zookeeper.timeout=30000
###########################################################################
# Data Parser Configurations
###########################################################################
-kafka.parsing.broker.url={{ parser_broker_url }}
-kafka.parser.topic={{ parser_broker_topic }}
-kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }}
+kafka.broker.url={{ parser_broker_url }}
+data.parser.topic={{ parser_broker_topic }}
+data.parser.broker.publisher.id={{ parser_broker_publisher_id }}
###########################################################################
# Job Submission Task Level Configurations
diff --git
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
index 06833e03e8..43aa33f2d2 100644
---
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
@@ -26,8 +26,8 @@ regserver.server.port={{ registry_port }}
# Helix workflow manager configurations
###########################################################################
kafka.broker.url={{ job_monitor_broker_url }}
-kafka.broker.topic={{ job_monitor_broker_topic }}
-kafka.broker.consumer.group={{ job_monitor_broker_consumer_group }}
+job.monitor.broker.topic={{ job_monitor_broker_topic }}
+job.monitor.broker.consumer.group={{ job_monitor_broker_consumer_group }}
helix.cluster.name={{ helix_cluster_name }}
post.workflow.manager.name={{ helix_post_wm_name }}
post.workflow.manager.loadbalance.clusters={{
helix_post_wm_load_balance_clusters }}
diff --git
a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
index 4bc1b99bd2..cfda787ec0 100644
---
a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
@@ -27,7 +27,7 @@ email.based.monitor.store.protocol=imaps
email.based.monitoring.period=10000
#These properties will be used to published parsed email messages to job
monitor queue
-job.monitor.broker.url={{ job_monitor_broker_url }}
+kafka.broker.url={{ job_monitor_broker_url }}
job.monitor.broker.topic={{ job_monitor_broker_topic }}
job.monitor.email.publisher.id={{ email_job_monitor_broker_publisher }}
diff --git
a/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
b/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
index 1324453969..d1d14f98c2 100644
---
a/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2
@@ -19,11 +19,10 @@
zookeeper.server.connection={{ zookeeper_connection_url }}
zookeeper.timeout=30000
-realtime.monitor.broker.url={{ realtime_monitor_broker_url }}
-realtime.monitor.broker.consumer.group={{
realtime_monitor_broker_consumer_group }}
+kafka.broker.url={{ job_monitor_broker_url }}
+realtime.monitor.broker.consumer.group={{realtime_monitor_broker_consumer_group
}}
realtime.monitor.broker.topic={{ realtime_monitor_broker_topic }}
-job.monitor.broker.url={{ job_monitor_broker_url }}
job.monitor.broker.topic={{ job_monitor_broker_topic }}
job.monitor.realtime.publisher.id={{ realtime_monitor_broker_publisher }}
diff --git
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
index 217a82d8cd..1680928188 100644
---
a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
+++
b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java
@@ -52,10 +52,7 @@ public class ProxyController {
producer = new KafkaProducer<>(props);
}
- @PostMapping(
- value = "/topics/{topic}",
- consumes = "application/vnd.kafka.json.v2+json",
- produces = "application/vnd.kafka.v2+json")
+ @PostMapping(value = "/topics/{topic}", consumes =
"application/vnd.kafka.json.v2+json")
public ResponseEntity<?> postToKafka(@PathVariable("topic") String topic,
@RequestBody String body) {
try {
JsonNode root = objectMapper.readTree(body);