This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new c665a66  Directory monitor to auto detect new directory creations
c665a66 is described below

commit c665a6607c0b7bb43755a018467a27c4d360c34e
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sat Oct 30 04:02:14 2021 -0400

    Directory monitor to auto detect new directory creations
---
 .../directory-monitor/pom.xml                      |  84 ++++++++++++
 .../src/main/dist/bin/dir-monitor-daemon.sh        | 113 ++++++++++++++++
 .../src/main/dist/bin/dir-monitor.sh               |  71 ++++++++++
 .../directory-monitor/src/main/dist/bin/setenv.sh  |  46 +++++++
 .../src/main/dist/conf/application.properties      |  28 ++++
 .../src/main/dist/conf/logback.xml                 |  50 +++++++
 .../src/main/dist/dir-monitor-bin-assembly.xml     |  85 ++++++++++++
 .../datalake/dmonitor/DirectoryMonitor.java        | 145 +++++++++++++++++++++
 .../airavata/datalake/dmonitor/EventNotifier.java  | 100 ++++++++++++++
 .../datalake/dmonitor/SaturationGauge.java         | 111 ++++++++++++++++
 .../src/main/resources/application.properties      |  28 ++++
 .../src/main/resources/logback.xml                 |  50 +++++++
 .../data-orchestrator-clients/pom.xml              |  22 ++++
 .../handlers/async/OrchestratorEventProcessor.java |   6 +
 .../src/main/proto/notification.proto              |   3 +-
 pom.xml                                            |   4 +-
 16 files changed, 943 insertions(+), 3 deletions(-)

diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/pom.xml 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/pom.xml
new file mode 100644
index 0000000..265072d
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-clients/directory-monitor/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>data-orchestrator-clients</artifactId>
+        <groupId>org.apache.airavata.data.lake</groupId>
+        <version>0.01-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>directory-monitor</artifactId>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+            <version>${spring.boot.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${log4j.over.slf4j}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>data-orchestrator-messaging</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven.assembly.plugin}</version>
+                <executions>
+                    <execution>
+                        <id>mft-controller-distribution-package</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                            
<finalName>Dir-Monitor-${project.version}</finalName>
+                            <descriptors>
+                                
<descriptor>src/main/dist/dir-monitor-bin-assembly.xml</descriptor>
+                            </descriptors>
+                            <attach>false</attach>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor-daemon.sh
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor-daemon.sh
new file mode 100644
index 0000000..2a549f6
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor-daemon.sh
@@ -0,0 +1,113 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+. `dirname $0`/setenv.sh
+# Capture user's working dir before changing directory
+CWD="$PWD"
+cd ${AIRAVATA_HOME}/bin
+LOGO_FILE="logo.txt"
+
+JAVA_OPTS="-Dspring.config.location=${AIRAVATA_HOME}/conf/ 
-Dairavata.home=${AIRAVATA_HOME} 
-Dlogback.configurationFile=file:${AIRAVATA_HOME}/conf/logback.xml"
+AIRAVATA_COMMAND=""
+EXTRA_ARGS=""
+SERVERS=""
+LOGO=true
+IS_SUBSET=false
+SUBSET=""
+DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/airavata-daemon.out"
+LOG_FILE=$DEFAULT_LOG_FILE
+
+SERVICE_NAME="Directory Monitor"
+PID_PATH_NAME="${AIRAVATA_HOME}/bin/service-pid"
+
+case $1 in
+    start)
+        echo "Starting $SERVICE_NAME ..."
+        if [ ! -f $PID_PATH_NAME ]; then
+            nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
+            org.apache.airavata.datalake.dmonitor.DirectoryMonitor 
${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+            echo $! > $PID_PATH_NAME
+            echo "$SERVICE_NAME started ..."
+        else
+            echo "$SERVICE_NAME is already running ..."
+        fi
+    ;;
+    stop)
+        if [ -f $PID_PATH_NAME ]; then
+            PID=$(cat $PID_PATH_NAME);
+            echo "$SERVICE_NAME stoping ..."
+            kill $PID;
+            RETRY=0
+            while kill -0 $PID 2> /dev/null; do
+                echo "Waiting for the process $PID to be stopped"
+                RETRY=`expr ${RETRY} + 1`
+                if [ "${RETRY}" -gt "20" ]
+                then
+                    echo "Forcefully killing the process as it is not 
responding ..."
+                    kill -9 $PID
+                fi
+                sleep 1
+            done
+            echo "$SERVICE_NAME stopped ..."
+            rm $PID_PATH_NAME
+        else
+            echo "$SERVICE_NAME is not running ..."
+        fi
+    ;;
+    restart)
+        if [ -f $PID_PATH_NAME ]; then
+            PID=$(cat $PID_PATH_NAME);
+            echo "$SERVICE_NAME stopping ...";
+            kill $PID;
+            RETRY=0
+            while kill -0 $PID 2> /dev/null; do
+                echo "Waiting for the process $PID to be stopped"
+                RETRY=`expr ${RETRY} + 1`
+                if [ "${RETRY}" -gt "20" ]
+                then
+                    echo "Forcefully killing the process as it is not 
responding ..."
+                    kill -9 $PID
+                fi
+                sleep 1
+            done
+            echo "$SERVICE_NAME stopped ...";
+            rm $PID_PATH_NAME
+            echo "$SERVICE_NAME starting ..."
+            nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
+            org.apache.airavata.datalake.dmonitor.DirectoryMonitor 
${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+            echo $! > $PID_PATH_NAME
+            echo "$SERVICE_NAME started ..."
+        else
+            echo "$SERVICE_NAME is not running ..."
+        fi
+    ;;
+    -h)
+        echo "Usage: dir-monitor-daemon.sh"
+
+        echo "command options:"
+        echo "  start               Start server in daemon mode"
+        echo "  stop                Stop server running in daemon mode"
+        echo "  restart             Restart server in daemon mode"
+           echo "  -log <LOG_FILE>     Where to redirect stdout/stderr 
(defaults to $DEFAULT_LOG_FILE)"
+        echo "  -h                  Display this help and exit"
+        shift
+        exit 0
+    ;;
+esac
+
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor.sh
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor.sh
new file mode 100644
index 0000000..902d6ea
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+. `dirname $0`/setenv.sh
+# Capture user's working dir before changing directory
+CWD="$PWD"
+cd ${AIRAVATA_HOME}/bin
+LOGO_FILE="logo.txt"
+
+JAVA_OPTS="-Dspring.config.location=${AIRAVATA_HOME}/conf/ 
-Dairavata.home=${AIRAVATA_HOME} 
-Dlogback.configurationFile=file:${AIRAVATA_HOME}/conf/logback.xml"
+AIRAVATA_COMMAND=""
+EXTRA_ARGS=""
+SERVERS=""
+IS_SUBSET=false
+SUBSET=""
+DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/console.out"
+LOG_FILE=$DEFAULT_LOG_FILE
+
+# parse command arguments
+for var in "$@"
+do
+    case ${var} in
+        -xdebug)
+               AIRAVATA_COMMAND="${AIRAVATA_COMMAND}"
+            JAVA_OPTS="$JAVA_OPTS -Xdebug -Xnoagent 
-Xrunjdwp:transport=dt_socket,server=y,address=*:8000"
+            shift
+           ;;
+        -log)
+            shift
+            LOG_FILE="$1"
+            shift
+            # If relative path, expand to absolute path using the user's $CWD
+            if [ -z "`echo "$LOG_FILE" | egrep "^/"`" ]; then
+                LOG_FILE="${CWD}/${LOG_FILE}"
+            fi
+        ;;
+        -h)
+            echo "Usage: dir-monitor.sh"
+
+            echo "command options:"
+            echo "  -xdebug             Start Directory Monitor under JPDA 
debugger"
+            echo "  -h                  Display this help and exit"
+            shift
+            exit 0
+        ;;
+           *)
+               EXTRA_ARGS="${EXTRA_ARGS} ${var}"
+            shift
+        ;;
+    esac
+done
+
+java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
+    org.apache.airavata.datalake.dmonitor.DirectoryMonitor ${AIRAVATA_COMMAND} 
$*
+
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/setenv.sh
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/setenv.sh
new file mode 100755
index 0000000..9e894e1
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/setenv.sh
@@ -0,0 +1,46 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+  ls=`ls -ld "$PRG"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '.*/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRGDIR=`dirname "$PRG"`
+
+# Only set AIRAVATA_HOME if not already set
+[ -z "$AIRAVATA_HOME" ] && AIRAVATA_HOME=`cd "$PRGDIR/.." ; pwd`
+
+AIRAVATA_CLASSPATH=""
+
+for f in "$AIRAVATA_HOME"/lib/*.jar
+do
+  AIRAVATA_CLASSPATH="$AIRAVATA_CLASSPATH":$f
+done
+
+export AIRAVATA_HOME
+export AIRAVATA_CLASSPATH
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/application.properties
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/application.properties
new file mode 100644
index 0000000..67b7778
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/application.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+#
+tenant.id=
+base.path=/images/GatewayTests/workdir
+monitor.depth=2
+resource.type=FOLDER
+host.name=
+event.type=MODIFY
+auth.token=
+kafka.url=
+kafka.publisher.id=evenpub
+kafka.event.topic=data-orchestrator-file-events
\ No newline at end of file
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/logback.xml
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/logback.xml
new file mode 100644
index 0000000..3afe661
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="LOGFILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <File>../logs/airavata.log</File>
+        <Append>true</Append>
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="ch.qos.logback" level="WARN"/>
+    <logger name="org.apache.helix" level="WARN"/>
+    <logger name="org.apache.zookeeper" level="ERROR"/>
+    <logger name="org.apache.helix" level="ERROR"/>
+    <logger name="org.apache.airavata" level="INFO"/>
+    <logger name="org.hibernate" level="ERROR"/>
+    <logger name="net.schmizz.sshj" level="WARN"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="LOGFILE"/>
+    </root>
+</configuration>
\ No newline at end of file
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/dir-monitor-bin-assembly.xml
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/dir-monitor-bin-assembly.xml
new file mode 100644
index 0000000..743414b
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/dir-monitor-bin-assembly.xml
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<!DOCTYPE assembly [
+        <!ELEMENT assembly 
(id|includeBaseDirectory|baseDirectory|formats|fileSets|dependencySets)*>
+        <!ELEMENT id (#PCDATA)>
+        <!ELEMENT includeBaseDirectory (#PCDATA)>
+        <!ELEMENT baseDirectory (#PCDATA)>
+        <!ELEMENT formats (format)*>
+        <!ELEMENT format (#PCDATA)>
+        <!ELEMENT fileSets (fileSet)*>
+        <!ELEMENT fileSet (directory|outputDirectory|fileMode|includes)*>
+        <!ELEMENT directory (#PCDATA)>
+        <!ELEMENT outputDirectory (#PCDATA)>
+        <!ELEMENT includes (include)*>
+        <!ELEMENT include (#PCDATA)>
+        <!ELEMENT dependencySets (dependencySet)*>
+        <!ELEMENT dependencySet 
(outputDirectory|outputFileNameMapping|includes)*>
+        ]>
+<assembly>
+
+    <id>bin</id>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <baseDirectory>Dir-Monitor-${project.version}</baseDirectory>
+    <formats>
+        <format>tar.gz</format>
+        <format>zip</format>
+    </formats>
+
+    <fileSets>
+        <fileSet>
+            <directory>src/main/dist/bin</directory>
+            <outputDirectory>bin</outputDirectory>
+            <fileMode>777</fileMode>
+            <includes>
+                <include>*.sh</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/dist/conf</directory>
+            <outputDirectory>conf</outputDirectory>
+            <includes>
+                <include>application.properties</include>
+                <include>logback.xml</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>./</directory>
+            <outputDirectory>logs</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+        </fileSet>
+        <fileSet>
+            <directory>target</directory>
+            <outputDirectory>lib</outputDirectory>
+            <includes>
+                <include>*.jar</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+
+    <dependencySets>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <outputDirectory>lib</outputDirectory>
+            <includes>
+                <include>*</include>
+            </includes>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/DirectoryMonitor.java
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/DirectoryMonitor.java
new file mode 100644
index 0000000..fe8a7cb
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/DirectoryMonitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.dmonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.*;
+import java.util.HashMap;
+import java.util.Map;
+
+@SpringBootApplication()
+public class DirectoryMonitor implements CommandLineRunner {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(DirectoryMonitor.class);
+
+    @org.springframework.beans.factory.annotation.Value("${tenant.id}")
+    private String tenantId;
+
+    @org.springframework.beans.factory.annotation.Value("${base.path}")
+    private String basePath;
+
+    @org.springframework.beans.factory.annotation.Value("${monitor.depth}")
+    private int monitorDepth;
+
+    @org.springframework.beans.factory.annotation.Value("${resource.type}")
+    private String resourceType;
+
+    @org.springframework.beans.factory.annotation.Value("${host.name}")
+    private String hostName;
+
+    @org.springframework.beans.factory.annotation.Value("${event.type}")
+    private String eventType;
+
+    @org.springframework.beans.factory.annotation.Value("${auth.token}")
+    private String authToken;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.url}")
+    private String kafkaUrl;
+
+    
@org.springframework.beans.factory.annotation.Value("${kafka.publisher.id}")
+    private String kafkaPublisherId;
+
+    @org.springframework.beans.factory.annotation.Value("${kafka.event.topic}")
+    private String kafkaEventTopic;
+
+    private Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+
+    public static void main(String args[]) {
+        SpringApplication app = new SpringApplication(DirectoryMonitor.class);
+        app.setWebApplicationType(WebApplicationType.NONE);
+        app.run(args);
+    }
+
+    private void registerPathsRecursively(Path currentPath, int depth, 
WatchService watchService) throws IOException {
+        File[] children = currentPath.toAbsolutePath().toFile().listFiles();
+
+        if (depth < monitorDepth) {
+            for (File child : children) {
+                if (child.isDirectory()) {
+                    Path childPath = Paths.get(child.getAbsolutePath());
+                    WatchKey watchKey = childPath.register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE);
+                    logger.info("Registering path {}", 
childPath.toAbsolutePath().toString());
+                    watchKeyPathMap.put(watchKey, childPath);
+                    registerPathsRecursively(childPath, depth + 1, 
watchService);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+
+        EventNotifier eventNotifier = 
EventNotifier.EventNotifierBuilder.newBuilder()
+                .withTenantId(tenantId)
+                .withResourceType(resourceType)
+                .withHostName(hostName)
+                .withEventType(eventType)
+                .withAuthToken(authToken)
+                .withKafkaUrl(kafkaUrl)
+                .withKafkaPublisherId(kafkaPublisherId)
+                .withKafkaEventTopic(kafkaEventTopic).build();
+
+        SaturationGauge saturationGauge = new SaturationGauge();
+        saturationGauge.start(eventNotifier);
+
+        WatchService watchService = FileSystems.getDefault().newWatchService();
+        Path base = Paths.get(basePath);
+        watchKeyPathMap.put(base.register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE), base);
+
+        registerPathsRecursively(base, 0, watchService);
+
+        boolean poll = true;
+
+        while (poll) {
+            WatchKey key = watchService.take();
+            Path parentPath = watchKeyPathMap.get(key);
+            for (WatchEvent<?> event : key.pollEvents()) {
+
+                Path eventPath = parentPath.resolve((Path) event.context());
+                int eventDepth = base.relativize(eventPath).getNameCount();
+
+                logger.info("Event path " + 
eventPath.toAbsolutePath().toString() + ", Depth " + eventDepth);
+
+                if (eventDepth <= monitorDepth) {
+                    logger.info("Registering new path {}", 
eventPath.toAbsolutePath().toString());
+                    WatchKey newKey = eventPath.register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE);
+                    watchKeyPathMap.put(newKey, eventPath);
+                }
+
+                if (eventDepth == monitorDepth + 1 && 
eventPath.toAbsolutePath().toFile().isDirectory()) {
+                    logger.info("Detected a candidate directory {}", 
eventPath.toAbsolutePath().toString());
+                    
saturationGauge.monitorSaturation(eventPath.toAbsolutePath().toFile());
+                }
+                //File file = path.resolve((Path) event.context()).toFile();
+                //logger.info("Event : " + event.kind() + " Type : " + 
(file.isDirectory()? "Directory": "File") +  " Name : " + event.context());
+                //if (file.isDirectory()) {
+                    //saturationGauge.monitorSaturation(file);
+                //}
+            }
+            poll = key.reset();
+        }
+    }
+}
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/EventNotifier.java
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/EventNotifier.java
new file mode 100644
index 0000000..49e39be
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/EventNotifier.java
@@ -0,0 +1,100 @@
+package org.apache.airavata.datalake.dmonitor;
+
+import 
org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
+import 
org.apache.airavata.dataorchestrator.messaging.publisher.MessageProducer;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+public class EventNotifier {
+    private String tenantId;
+    private String resourceType;
+    private String hostName;
+    private String eventType;
+    private String authToken;
+    private String kafkaEventTopic;
+    private MessageProducer kafkaProducer;
+
+    public void notify(String resourcePath) throws Exception {
+        kafkaProducer.publish(kafkaEventTopic, Notification.newBuilder()
+                .setTenantId(tenantId)
+                
.setBasePath(Paths.get(resourcePath).getParent().toAbsolutePath()
+                        .getParent().toAbsolutePath().toString() + 
File.separator)
+                .setResourceType(resourceType)
+                .setHostName(hostName)
+                .setEventType(Notification.NotificationType.valueOf(eventType))
+                .setAuthToken(authToken)
+                .setResourcePath(resourcePath).build(), (metadata, exception) 
-> {});
+    }
+
+
+    public static final class EventNotifierBuilder {
+        private String tenantId;
+        private String resourceType;
+        private String hostName;
+        private String eventType;
+        private String authToken;
+        private String kafkaUrl;
+        private String kafkaPublisherId;
+        private String kafkaEventTopic;
+
+        private EventNotifierBuilder() {
+        }
+
+        public static EventNotifierBuilder newBuilder() {
+            return new EventNotifierBuilder();
+        }
+
+        public EventNotifierBuilder withTenantId(String tenantId) {
+            this.tenantId = tenantId;
+            return this;
+        }
+
+        public EventNotifierBuilder withResourceType(String resourceType) {
+            this.resourceType = resourceType;
+            return this;
+        }
+
+        public EventNotifierBuilder withHostName(String hostName) {
+            this.hostName = hostName;
+            return this;
+        }
+
+        public EventNotifierBuilder withEventType(String eventType) {
+            this.eventType = eventType;
+            return this;
+        }
+
+        public EventNotifierBuilder withAuthToken(String authToken) {
+            this.authToken = authToken;
+            return this;
+        }
+
+        public EventNotifierBuilder withKafkaUrl(String kafkaUrl) {
+            this.kafkaUrl = kafkaUrl;
+            return this;
+        }
+
+        public EventNotifierBuilder withKafkaPublisherId(String 
kafkaPublisherId) {
+            this.kafkaPublisherId = kafkaPublisherId;
+            return this;
+        }
+
+        public EventNotifierBuilder withKafkaEventTopic(String 
kakfaEventTopic) {
+            this.kafkaEventTopic = kakfaEventTopic;
+            return this;
+        }
+
+        public EventNotifier build() {
+            EventNotifier eventNotifier = new EventNotifier();
+            eventNotifier.resourceType = this.resourceType;
+            eventNotifier.authToken = this.authToken;
+            eventNotifier.hostName = this.hostName;
+            eventNotifier.kafkaEventTopic = this.kafkaEventTopic;
+            eventNotifier.eventType = this.eventType;
+            eventNotifier.tenantId = this.tenantId;
+            eventNotifier.kafkaProducer = new MessageProducer(kafkaUrl, 
kafkaPublisherId);
+            return eventNotifier;
+        }
+    }
+}
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/SaturationGauge.java
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/SaturationGauge.java
new file mode 100644
index 0000000..b409d17
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/SaturationGauge.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.dmonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+public class SaturationGauge {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(SaturationGauge.class);
+
+    public Map<String, Long> directorySizes = new ConcurrentHashMap<>();
+    public Map<String, Integer> monitorCount = new ConcurrentHashMap<>();
+    final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    final ExecutorService monitoringService = Executors.newFixedThreadPool(10);
+
+    public void start(EventNotifier eventNotifier) {
+        scheduledExecutorService.scheduleWithFixedDelay(() -> {
+            List<Future<Boolean>> submitFutures = new ArrayList<>();
+            for (String key : directorySizes.keySet()) {
+                Future<Boolean> submitFuture = monitoringService.submit(() -> {
+
+                    monitorCount.put(key, monitorCount.get(key) + 1);
+
+                    try {
+                        long oldSize = directorySizes.get(key);
+                        long newSize = getFolderSize(new File(key));
+                        directorySizes.put(key, newSize);
+                        logger.info("Directory : " + key + " Size : " + 
newSize + " Scan count : " + monitorCount.get(key));
+
+                        if (oldSize == newSize && monitorCount.get(key) > 3) {
+                            logger.info("Directory " + key + " is saturated. 
Final size " + oldSize);
+                            monitorCount.remove(key);
+                            directorySizes.remove(key);
+                            eventNotifier.notify(key);
+                        }
+
+                        if (oldSize != newSize) {
+                            monitorCount.put(key, 0);
+                        }
+
+                        return true;
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        if (monitorCount.get(key) > 3) {
+                            monitorCount.remove(key);
+                            directorySizes.remove(key);
+                        }
+                        return false;
+                    }
+                });
+
+                submitFutures.add(submitFuture);
+            }
+
+            for (Future<Boolean> submitFuture: submitFutures) {
+                try {
+                    submitFuture.get();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            logger.debug("All monitor threads were completed");
+        }, 1, 20, TimeUnit.SECONDS);
+    }
+
+    public void monitorSaturation(File folder) {
+        if (! directorySizes.containsKey(folder.getAbsolutePath())) {
+            directorySizes.put(folder.getAbsolutePath(), 0L);
+            monitorCount.put(folder.getAbsolutePath(), 0);
+        }
+    }
+
+    private long getFolderSize(File folder) {
+        long length = 0;
+        File[] files = folder.listFiles();
+
+        int count = files.length;
+
+        for (int i = 0; i < count; i++) {
+            if (files[i].isFile()) {
+                length += files[i].length();
+            }
+            else {
+                length += getFolderSize(files[i]);
+            }
+        }
+        return length;
+    }
+}
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/application.properties
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/application.properties
new file mode 100644
index 0000000..bbc8097
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/application.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+#
+tenant.id=
+base.path=/tmp/airavata/
+monitor.depth=2
+resource.type=FOLDER
+host.name=
+event.type=MODIFY
+auth.token=
+kafka.url=
+kafka.publisher.id=evenpub
+kafka.event.topic=data-orchestrator-file-events
\ No newline at end of file
diff --git 
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/logback.xml
 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/logback.xml
new file mode 100644
index 0000000..3afe661
--- /dev/null
+++ 
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="LOGFILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <File>../logs/airavata.log</File>
+        <Append>true</Append>
+        <encoder>
+            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="ch.qos.logback" level="WARN"/>
+    <logger name="org.apache.helix" level="WARN"/>
+    <logger name="org.apache.zookeeper" level="ERROR"/>
+    <logger name="org.apache.helix" level="ERROR"/>
+    <logger name="org.apache.airavata" level="INFO"/>
+    <logger name="org.hibernate" level="ERROR"/>
+    <logger name="net.schmizz.sshj" level="WARN"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="LOGFILE"/>
+    </root>
+</configuration>
\ No newline at end of file
diff --git a/data-orchestrator/data-orchestrator-clients/pom.xml 
b/data-orchestrator/data-orchestrator-clients/pom.xml
index ab1dbb0..6f0ee3f 100644
--- a/data-orchestrator/data-orchestrator-clients/pom.xml
+++ b/data-orchestrator/data-orchestrator-clients/pom.xml
@@ -1,4 +1,25 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
@@ -11,6 +32,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>data-orchestrator-clients-core</module>
+        <module>directory-monitor</module>
     </modules>
     <modelVersion>4.0.0</modelVersion>
 
diff --git 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 6246d85..9bd855d 100644
--- 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -179,9 +179,15 @@ public class OrchestratorEventProcessor implements 
Runnable {
                         notification.getResourceType());
                 logger.error("Resource should be a Folder type");
             }
+
             String removeBasePath = 
notification.getResourcePath().substring(notification.getBasePath().length());
             String[] splitted = removeBasePath.split("/");
 
+            if (splitted.length < 2) {
+                logger.error("Invalid path. Need at least two folder levels 
from base. {}", removeBasePath);
+                throw new Exception("Invalid path. Need at least two folder 
levels from base");
+            }
+
             String adminUser = splitted[0];
             String owner = splitted[1].split("_")[0];
 
diff --git 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
index 55a1166..2a019fb 100644
--- 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
+++ 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
@@ -43,12 +43,13 @@ message Notification {
 }
 
 message ReplicaLocation {
-    string baseDirectoryResourceId = 1;
+    string storageId = 1;
     enum ReplicaMode {
         ARCHIVE = 0;
         ORIGINAL = 1;
     }
     ReplicaMode replicaMode = 2;
+    string resourcePath = 3;
 }
 
 message NotificationStatus {
diff --git a/pom.xml b/pom.xml
index 24676ba..cba4a14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,8 +59,8 @@
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.6.1</version>
                 <configuration>
-                    <source>11</source>
-                    <target>11</target>
+                    <source>1.8</source>
+                    <target>1.8</target>
                     <fork>true</fork>
                 </configuration>
             </plugin>

Reply via email to