This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new c407744 Avoiding the same event concurrently running
c407744 is described below
commit c40774424d066d1c5d5639c0f8924fbe81113401
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Mon Aug 23 11:05:40 2021 -0400
Avoiding the same event concurrently running
---
.../handlers/async/OrchestratorEventHandler.java | 17 ++++++++++++++---
.../handlers/async/OrchestratorEventProcessor.java | 7 ++++++-
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index d6d25bc..d706c16 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -21,9 +21,10 @@ import
org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -40,6 +41,7 @@ public class OrchestratorEventHandler {
private ExecutorService executorService;
private ScheduledExecutorService ouboundExecutorService;
private MessageConsumer messageConsumer;
+ private final Set<String> eventCache = new HashSet<>();
public OrchestratorEventHandler() {
}
@@ -56,9 +58,18 @@ public class OrchestratorEventHandler {
public void startProcessing() throws Exception {
messageConsumer.consume((notificationEvent -> {
- LOGGER.info("Message received for resource path {}",
notificationEvent.getResourcePath());
+ LOGGER.info("Message received for resource path {} type {}",
notificationEvent.getResourcePath(),
+ notificationEvent.getEventType());
try {
- this.executorService.submit(new
OrchestratorEventProcessor(configuration, notificationEvent));
+ if (!eventCache.contains(notificationEvent.getResourcePath() +
":" + notificationEvent.getHostName())) {
+ eventCache.add(notificationEvent.getResourcePath() + ":" +
notificationEvent.getHostName());
+ this.executorService.submit(new OrchestratorEventProcessor(
+ configuration, notificationEvent, eventCache));
+ } else {
+ LOGGER.info("Event is already processing");
+ }
+
+
} catch (Exception e) {
LOGGER.error("Failed tu submit data orchestrator event to
process on path {}",
notificationEvent.getResourcePath(), e);
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 6fc7d45..af8bac0 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -46,9 +46,12 @@ public class OrchestratorEventProcessor implements Runnable {
private DRMSConnector drmsConnector;
private Configuration configuration;
private WorkflowServiceConnector workflowServiceConnector;
+ private final Set<String> eventCache;
- public OrchestratorEventProcessor(Configuration configuration,
NotificationEvent notificationEvent) throws Exception {
+ public OrchestratorEventProcessor(Configuration configuration,
NotificationEvent notificationEvent,
+ Set<String> eventCache) throws Exception
{
this.notificationEvent = notificationEvent;
+ this.eventCache = eventCache;
this.drmsConnector = new DRMSConnector(configuration);
this.workflowServiceConnector = new
WorkflowServiceConnector(configuration);
this.configuration = configuration;
@@ -255,6 +258,8 @@ public class OrchestratorEventProcessor implements Runnable
{
} catch (Exception e) {
logger.error("Failed to process event for resource path {}",
notificationEvent.getResourcePath(), e);
+ } finally {
+ this.eventCache.remove(notificationEvent.getResourcePath() + ":" +
notificationEvent.getHostName());
}
}
}