This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 8303f033761976873d9e63a4bf8682bf32c9c372 Author: yasithdev <[email protected]> AuthorDate: Mon Jul 14 20:37:47 2025 -0500 simplify repeated properties and organize some property loads. --- .devcontainer/docker-compose-alt.yml | 2 +- .../impl/task/parsing/ParsingTriggeringTask.java | 8 +-- .../helix/impl/workflow/ParserWorkflowManager.java | 8 +-- .../helix/impl/workflow/PostWorkflowManager.java | 4 +- .../airavata/monitor/email/EmailBasedMonitor.java | 17 +++--- .../airavata/monitor/kafka/MessageProducer.java | 11 ++-- .../airavata/monitor/realtime/RealtimeMonitor.java | 14 +++-- .../realtime/parser/RealtimeJobStatusParser.java | 68 ++++++++-------------- .../src/main/resources/airavata-server.properties | 14 ++--- .../parser-wm/airavata-server.properties.j2 | 10 ++-- .../participant/airavata-server.properties.j2 | 6 +- .../post-wm/airavata-server.properties.j2 | 4 +- .../email-monitor/airavata-server.properties.j2 | 2 +- .../realtime-monitor/airavata-server.properties.j2 | 5 +- .../restproxy/controller/ProxyController.java | 5 +- 15 files changed, 79 insertions(+), 99 deletions(-) diff --git a/.devcontainer/docker-compose-alt.yml b/.devcontainer/docker-compose-alt.yml index b105917d60..d1e1e919f2 100644 --- a/.devcontainer/docker-compose-alt.yml +++ b/.devcontainer/docker-compose-alt.yml @@ -218,7 +218,7 @@ services: - regserver.server.port=8970 - email.based.monitor.address=CHANGEME - email.based.monitor.password=CHANGEME - - job.monitor.broker.url=kafka:9092 + - kafka.broker.url=kafka:9092 command: ["/tmp/wait-for-it.sh", "zookeeper:2181", "--", "/tmp/wait-for-it.sh", "apiserver:8970", "--" , "/tmp/wait-for-it.sh", "kafka:9092", "--", "/opt/apache-airavata-email-monitor/bin/email-monitor.sh"] db: diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java index 53cc488304..f0d3ea37b6 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java @@ -45,8 +45,8 @@ public class ParsingTriggeringTask extends AiravataTask { if (producer == null) { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.url")); - props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.publisher.id")); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url")); + props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("data.parser.broker.publisher.id")); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessCompletionMessageSerializer.class.getName()); producer = new KafkaProducer<String, ProcessCompletionMessage>(props); @@ -56,9 +56,7 @@ public class ParsingTriggeringTask extends AiravataTask { public void submitMessageToParserEngine(ProcessCompletionMessage completionMessage) throws ExecutionException, InterruptedException, ApplicationSettingsException { final ProducerRecord<String, ProcessCompletionMessage> record = new ProducerRecord<>( - ServerSettings.getSetting("kafka.parser.topic"), - completionMessage.getExperimentId(), - completionMessage); + ServerSettings.getSetting("data.parser.topic"), completionMessage.getExperimentId(), completionMessage); RecordMetadata recordMetadata = producer.send(record).get(); producer.flush(); } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java index 688e82a037..e75f494210 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java @@ -56,7 +56,7 @@ public class ParserWorkflowManager extends WorkflowManager { private static final Logger logger = LoggerFactory.getLogger(ParserWorkflowManager.class); private static final CountMonitor parserwfCounter = new CountMonitor("parser_wf_counter"); - private String parserStorageResourceId = ServerSettings.getSetting("parser.storage.resource.id"); + private String parserStorageResourceId = ServerSettings.getSetting("data.parser.storage.resource.id"); public ParserWorkflowManager() throws ApplicationSettingsException { super( @@ -440,14 +440,14 @@ public class ParserWorkflowManager extends WorkflowManager { final Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.parsing.broker.url")); - props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.parser.broker.consumer.group")); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url")); + props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("data.parser.broker.consumer.group")); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProcessCompletionMessageDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); final Consumer<String, ProcessCompletionMessage> consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.parser.topic"))); + consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("data.parser.topic"))); logger.info("Starting the kafka consumer.."); diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index 2dc6fb15a3..8fe307cbae 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -92,7 +92,7 @@ public class PostWorkflowManager extends WorkflowManager { private Consumer<String, JobStatusResult> createConsumer() throws ApplicationSettingsException { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url")); - props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("kafka.broker.consumer.group")); + props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("job.monitor.broker.consumer.group")); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -100,7 +100,7 @@ public class PostWorkflowManager extends WorkflowManager { // Create the consumer using props. final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. - consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.broker.topic"))); + consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("job.monitor.broker.topic"))); return consumer; } diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java index b56ddb1918..7e36df285d 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java @@ -62,6 +62,7 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable { private Message[] flushUnseenMessages; private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new HashMap<>(); private long emailExpirationTimeMinutes; + private String publisherId; public EmailBasedMonitor() throws Exception { init(); @@ -76,6 +77,7 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable { storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol(); folderName = ServerSettings.getEmailBasedMonitorFolderName(); emailExpirationTimeMinutes = Long.parseLong(ServerSettings.getSetting("email.expiration.minutes")); + publisherId = ServerSettings.getSetting("job.monitor.email.publisher.id"); if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) { throw new AiravataException( "Unsupported store protocol , expected " + IMAPS + " or " + POP3 + " but found " + storeProtocol); @@ -143,7 +145,7 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable { log.info("[EJM]: Added monitor Id : {} to email based monitor map", jobId); } - private JobStatusResult parse(Message message) throws MessagingException, AiravataException { + private JobStatusResult parse(Message message, String publisherId) throws MessagingException, AiravataException { Address fromAddress = message.getFrom()[0]; String addressStr = fromAddress.toString(); ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr); @@ -156,7 +158,11 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable { try { JobStatusResult jobStatusResult = emailParser.parseEmail(message, regClient); - jobStatusResult.setPublisherName(ServerSettings.getSetting("job.monitor.email.publisher.id")); + jobStatusResult.setPublisherName(publisherId); + var jobId = jobStatusResult.getJobId(); + var jobName = jobStatusResult.getJobName(); + var jobStatus = jobStatusResult.getState().getValue(); + log.info("Parsed Job Status: From=[{}], Id={}, Name={}, State={}", publisherId, jobId, jobName, jobStatus); return jobStatusResult; } catch (Exception e) { getRegistryClientPool().returnBrokenResource(regClient); @@ -252,11 +258,8 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable { List<Message> unreadMessages = new ArrayList<>(); for (Message message : searchMessages) { try { - log.info("Parsing the job status message"); - JobStatusResult jobStatusResult = parse(message); - log.info("Job message parsed. Job Id " + jobStatusResult.getJobId() + ", Job Name " - + jobStatusResult.getJobName() + ", Job State " - + jobStatusResult.getState().getValue()); + log.info("Received Job Status [{}]: {}", publisherId, message); + JobStatusResult jobStatusResult = parse(message, publisherId); submitJobStatus(jobStatusResult); log.info("Submitted the job {} status to queue", jobStatusResult.getJobId()); processedMessages.add(message); diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java index 4320030a40..0f6967a5ef 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java @@ -30,24 +30,25 @@ import org.apache.kafka.common.serialization.StringSerializer; public class MessageProducer { final Producer<String, JobStatusResult> producer; + final String jobMonitorQueue; public MessageProducer() throws ApplicationSettingsException { producer = createProducer(); + jobMonitorQueue = ServerSettings.getSetting("job.monitor.broker.topic"); } private Producer<String, JobStatusResult> createProducer() throws ApplicationSettingsException { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("job.monitor.broker.url")); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url")); props.put(ProducerConfig.CLIENT_ID_CONFIG, ServerSettings.getSetting("job.monitor.broker.publisher.id")); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JobStatusResultSerializer.class.getName()); return new KafkaProducer<String, JobStatusResult>(props); } - public void submitMessageToQueue(JobStatusResult jobStatusResult) - throws ExecutionException, InterruptedException, ApplicationSettingsException { - final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>( - ServerSettings.getSetting("job.monitor.broker.topic"), jobStatusResult.getJobId(), jobStatusResult); + public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException { + var jobId = jobStatusResult.getJobId(); + final var record = new ProducerRecord<>(jobMonitorQueue, jobId, jobStatusResult); RecordMetadata recordMetadata = producer.send(record).get(); producer.flush(); } diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java index 16378a4459..7efddab18f 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java @@ -40,22 +40,26 @@ public class RealtimeMonitor extends AbstractMonitor { private static final Logger logger = LoggerFactory.getLogger(RealtimeMonitor.class); - private RealtimeJobStatusParser parser; + private final RealtimeJobStatusParser parser; + private final String publisherId; + private final String brokerTopic; public RealtimeMonitor() throws ApplicationSettingsException { parser = new RealtimeJobStatusParser(); + publisherId = ServerSettings.getSetting("job.monitor.realtime.publisher.id"); + brokerTopic = ServerSettings.getSetting("realtime.monitor.broker.topic"); } private Consumer<String, String> createConsumer() throws ApplicationSettingsException { final Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("realtime.monitor.broker.url")); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerSettings.getSetting("kafka.broker.url")); props.put(ConsumerConfig.GROUP_ID_CONFIG, ServerSettings.getSetting("realtime.monitor.broker.consumer.group")); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Create the consumer using props. final Consumer<String, String> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. - consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("realtime.monitor.broker.topic"))); + consumer.subscribe(Collections.singletonList(brokerTopic)); return consumer; } @@ -81,8 +85,8 @@ public class RealtimeMonitor extends AbstractMonitor { } private void process(String value, RegistryService.Client registryClient) throws MonitoringException { - logger.info("Received data " + value); - JobStatusResult statusResult = parser.parse(value, registryClient); + logger.info("Received Job Status [{}]: {}", publisherId, value); + JobStatusResult statusResult = parser.parse(value, publisherId, registryClient); if (statusResult != null) { logger.info("Submitting message to job monitor queue"); submitJobStatus(statusResult); diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java index 6cb72d0913..4a109f065c 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/parser/RealtimeJobStatusParser.java @@ -24,7 +24,6 @@ import com.google.gson.JsonSyntaxException; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.status.JobState; import org.apache.airavata.monitor.JobStatusResult; @@ -41,9 +40,9 @@ public class RealtimeJobStatusParser { for (int i = 0; i < 3; i++) { List<JobModel> jobsOfTask = registryClient.getJobs("taskId", taskId); - if (jobsOfTask == null || jobsOfTask.size() == 0) { + if (jobsOfTask == null || jobsOfTask.isEmpty()) { // Retry after 2s - logger.warn("No jobs for task " + taskId + ". Retrying in 2 seconds"); + logger.warn("No jobs for task {}. Retrying in 2 seconds", taskId); Thread.sleep(2000); } else { Optional<JobModel> filtered = jobsOfTask.stream() @@ -52,7 +51,7 @@ public class RealtimeJobStatusParser { if (filtered.isPresent()) { return filtered.get().getJobId(); } else { - logger.warn("No job for job name " + jobName + " and task " + taskId + ". Retrying in 2 seconds"); + logger.warn("No job for job name {} and task {}. Retrying in 2 seconds", jobName, taskId); Thread.sleep(2000); } } @@ -60,7 +59,7 @@ public class RealtimeJobStatusParser { return null; } - public JobStatusResult parse(String rawMessage, RegistryService.Client registryClient) { + public JobStatusResult parse(String rawMessage, String publisherId, RegistryService.Client registryClient) { try { Map asMap = new Gson().fromJson(rawMessage, Map.class); @@ -74,44 +73,26 @@ public class RealtimeJobStatusParser { try { String jobId = getJobIdIdByJobNameWithRetry(jobName, taskId, registryClient); if (jobId == null) { - logger.error("No job id for job name " + jobName); + logger.error("No job id for job name {}", jobName); return null; } - JobState jobState = null; - - switch (status) { - case "RUNNING": - jobState = JobState.ACTIVE; - break; - case "COMPLETED": - jobState = JobState.COMPLETE; - break; - case "FAILED": - jobState = JobState.FAILED; - break; - case "SUBMITTED": - jobState = JobState.SUBMITTED; - break; - case "QUEUED": - jobState = JobState.QUEUED; - break; - case "CANCELED": - jobState = JobState.CANCELED; - break; - case "SUSPENDED": - jobState = JobState.SUSPENDED; - break; - case "UNKNOWN": - jobState = JobState.UNKNOWN; - break; - case "NON_CRITICAL_FAIL": - jobState = JobState.NON_CRITICAL_FAIL; - break; - } + JobState jobState = + switch (status) { + case "RUNNING" -> JobState.ACTIVE; + case "COMPLETED" -> JobState.COMPLETE; + case "FAILED" -> JobState.FAILED; + case "SUBMITTED" -> JobState.SUBMITTED; + case "QUEUED" -> JobState.QUEUED; + case "CANCELED" -> JobState.CANCELED; + case "SUSPENDED" -> JobState.SUSPENDED; + case "UNKNOWN" -> JobState.UNKNOWN; + case "NON_CRITICAL_FAIL" -> JobState.NON_CRITICAL_FAIL; + default -> null; + }; if (jobState == null) { - logger.error("Invalid job state " + status); + logger.error("Invalid job state {}", status); return null; } @@ -119,23 +100,22 @@ public class RealtimeJobStatusParser { jobStatusResult.setJobId(jobId); jobStatusResult.setJobName(jobName); jobStatusResult.setState(jobState); - jobStatusResult.setPublisherName( - ServerSettings.getSetting("job.monitor.realtime.publisher.id")); + jobStatusResult.setPublisherName(publisherId); return jobStatusResult; } catch (Exception e) { - logger.error("Failed to fetch job id for job name " + jobName); + logger.error("Failed to fetch job id for job name {}", jobName); return null; } } else { - logger.error("Job name, taskId or status is null in message " + rawMessage); + logger.error("Job name, taskId or status is null in message {}", rawMessage); return null; } } else { - logger.error("Data structure of message " + rawMessage + " is not correct"); + logger.error("Data structure of message {} is not correct", rawMessage); return null; } } catch (JsonSyntaxException e) { - logger.error("Failed to parse raw data " + rawMessage + " to type Map", e); + logger.error("Failed to parse raw data {} to type Map", rawMessage, e); return null; } } diff --git a/airavata-api/src/main/resources/airavata-server.properties b/airavata-api/src/main/resources/airavata-server.properties index abe4425f36..7da7ff8ebf 100644 --- a/airavata-api/src/main/resources/airavata-server.properties +++ b/airavata-api/src/main/resources/airavata-server.properties @@ -87,18 +87,17 @@ job.monitor.broker.publisher.id=AiravataMonitorPublisher job.monitor.email.publisher.id=EmailBasedProducer job.monitor.realtime.publisher.id=RealtimeProducer job.monitor.broker.topic=monitoring-data -job.monitor.broker.url=airavata.host:9092 +job.monitor.broker.consumer.group=MonitoringConsumer + job.notification.emailids= job.notification.enable=true job.status.publish.endpoint=http://airavata.host:8082/topics/helix-airavata-mq job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator kafka.broker.url=airavata.host:9092 -kafka.broker.consumer.group=MonitoringConsumer -kafka.broker.topic=monitoring-data -kafka.parsing.broker.url=airavata.host:9092 -kafka.parser.broker.consumer.group=ParsingConsumer -kafka.parser.topic=parsing-data + +data.parser.broker.consumer.group=ParsingConsumer +data.parser.topic=parsing-data local.data.location=/tmp @@ -109,7 +108,7 @@ orchestrator.server.min.threads=50 orchestrator.server.port=8940 orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer -parser.storage.resource.id=CHANGE_ME +data.parser.storage.resource.id=CHANGE_ME participant.monitoring.enabled=true participant.monitoring.host=airavata.host @@ -146,7 +145,6 @@ prefetch.count=200 realtime.monitor.broker.consumer.group=monitor realtime.monitor.broker.topic=helix-airavata-mq -realtime.monitor.broker.url=airavata.host:9092 registry.jdbc.driver=org.mariadb.jdbc.Driver registry.jdbc.password=123456 diff --git a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2 index c097120d0d..649816abce 100644 --- a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2 @@ -51,11 +51,11 @@ zookeeper.timeout=30000 ########################################################################### # Data Parser Configurations ########################################################################### -kafka.parsing.broker.url={{ parser_broker_url }} -kafka.parser.broker.consumer.group={{ parser_broker_consumer_group }} -kafka.parser.topic={{ parser_broker_topic }} -parser.storage.resource.id={{ parser_storage_resource_id }} -kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }} +kafka.broker.url={{ parser_broker_url }} +data.parser.broker.consumer.group={{ parser_broker_consumer_group }} +data.parser.topic={{ parser_broker_topic }} +data.parser.broker.publisher.id={{ parser_broker_publisher_id }} +data.parser.storage.resource.id={{ parser_storage_resource_id }} post.workflow.manager.loadbalance.clusters=False ########################################################################### diff --git a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 index 63fd7be85d..d652a69712 100644 --- a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 @@ -72,9 +72,9 @@ zookeeper.timeout=30000 ########################################################################### # Data Parser Configurations ########################################################################### -kafka.parsing.broker.url={{ parser_broker_url }} -kafka.parser.topic={{ parser_broker_topic }} -kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }} +kafka.broker.url={{ parser_broker_url }} +data.parser.topic={{ parser_broker_topic }} +data.parser.broker.publisher.id={{ parser_broker_publisher_id }} ########################################################################### # Job Submission Task Level Configurations diff --git a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2 index 06833e03e8..43aa33f2d2 100644 --- a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2 @@ -26,8 +26,8 @@ regserver.server.port={{ registry_port }} # Helix workflow manager configurations ########################################################################### kafka.broker.url={{ job_monitor_broker_url }} -kafka.broker.topic={{ job_monitor_broker_topic }} -kafka.broker.consumer.group={{ job_monitor_broker_consumer_group }} +job.monitor.broker.topic={{ job_monitor_broker_topic }} +job.monitor.broker.consumer.group={{ job_monitor_broker_consumer_group }} helix.cluster.name={{ helix_cluster_name }} post.workflow.manager.name={{ helix_post_wm_name }} post.workflow.manager.loadbalance.clusters={{ helix_post_wm_load_balance_clusters }} diff --git a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2 b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2 index 4bc1b99bd2..cfda787ec0 100644 --- a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2 @@ -27,7 +27,7 @@ email.based.monitor.store.protocol=imaps email.based.monitoring.period=10000 #These properties will be used to published parsed email messages to job monitor queue -job.monitor.broker.url={{ job_monitor_broker_url }} +kafka.broker.url={{ job_monitor_broker_url }} job.monitor.broker.topic={{ job_monitor_broker_topic }} job.monitor.email.publisher.id={{ email_job_monitor_broker_publisher }} diff --git a/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2 b/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2 index 1324453969..d1d14f98c2 100644 --- a/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/job_monitor/templates/realtime-monitor/airavata-server.properties.j2 @@ -19,11 +19,10 @@ zookeeper.server.connection={{ zookeeper_connection_url }} zookeeper.timeout=30000 -realtime.monitor.broker.url={{ realtime_monitor_broker_url }} -realtime.monitor.broker.consumer.group={{ realtime_monitor_broker_consumer_group }} +kafka.broker.url={{ job_monitor_broker_url }} +realtime.monitor.broker.consumer.group={{realtime_monitor_broker_consumer_group }} realtime.monitor.broker.topic={{ realtime_monitor_broker_topic }} -job.monitor.broker.url={{ job_monitor_broker_url }} job.monitor.broker.topic={{ job_monitor_broker_topic }} job.monitor.realtime.publisher.id={{ realtime_monitor_broker_publisher }} diff --git a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java index 217a82d8cd..1680928188 100644 --- a/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java +++ b/modules/restproxy/src/main/java/org/apache/airavata/restproxy/controller/ProxyController.java @@ -52,10 +52,7 @@ public class ProxyController { producer = new KafkaProducer<>(props); } - @PostMapping( - value = "/topics/{topic}", - consumes = "application/vnd.kafka.json.v2+json", - produces = "application/vnd.kafka.v2+json") + @PostMapping(value = "/topics/{topic}", consumes = "application/vnd.kafka.json.v2+json") public ResponseEntity<?> postToKafka(@PathVariable("topic") String topic, @RequestBody String body) { try { JsonNode root = objectMapper.readTree(body);
