This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new c665a66 Directory monitor to auto detect new directory creations
c665a66 is described below
commit c665a6607c0b7bb43755a018467a27c4d360c34e
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sat Oct 30 04:02:14 2021 -0400
Directory monitor to auto detect new directory creations
---
.../directory-monitor/pom.xml | 84 ++++++++++++
.../src/main/dist/bin/dir-monitor-daemon.sh | 113 ++++++++++++++++
.../src/main/dist/bin/dir-monitor.sh | 71 ++++++++++
.../directory-monitor/src/main/dist/bin/setenv.sh | 46 +++++++
.../src/main/dist/conf/application.properties | 28 ++++
.../src/main/dist/conf/logback.xml | 50 +++++++
.../src/main/dist/dir-monitor-bin-assembly.xml | 85 ++++++++++++
.../datalake/dmonitor/DirectoryMonitor.java | 145 +++++++++++++++++++++
.../airavata/datalake/dmonitor/EventNotifier.java | 100 ++++++++++++++
.../datalake/dmonitor/SaturationGauge.java | 111 ++++++++++++++++
.../src/main/resources/application.properties | 28 ++++
.../src/main/resources/logback.xml | 50 +++++++
.../data-orchestrator-clients/pom.xml | 22 ++++
.../handlers/async/OrchestratorEventProcessor.java | 6 +
.../src/main/proto/notification.proto | 3 +-
pom.xml | 4 +-
16 files changed, 943 insertions(+), 3 deletions(-)
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/pom.xml
b/data-orchestrator/data-orchestrator-clients/directory-monitor/pom.xml
new file mode 100644
index 0000000..265072d
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-clients/directory-monitor/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>data-orchestrator-clients</artifactId>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <version>0.01-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>directory-monitor</artifactId>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <version>${spring.boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${log4j.over.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>data-orchestrator-messaging</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.plugin}</version>
+ <executions>
+ <execution>
+ <id>mft-controller-distribution-package</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <tarLongFileMode>posix</tarLongFileMode>
+
<finalName>Dir-Monitor-${project.version}</finalName>
+ <descriptors>
+
<descriptor>src/main/dist/dir-monitor-bin-assembly.xml</descriptor>
+ </descriptors>
+ <attach>false</attach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor-daemon.sh
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor-daemon.sh
new file mode 100644
index 0000000..2a549f6
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor-daemon.sh
@@ -0,0 +1,113 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+. `dirname $0`/setenv.sh
+# Capture user's working dir before changing directory
+CWD="$PWD"
+cd ${AIRAVATA_HOME}/bin
+LOGO_FILE="logo.txt"
+
+JAVA_OPTS="-Dspring.config.location=${AIRAVATA_HOME}/conf/
-Dairavata.home=${AIRAVATA_HOME}
-Dlogback.configurationFile=file:${AIRAVATA_HOME}/conf/logback.xml"
+AIRAVATA_COMMAND=""
+EXTRA_ARGS=""
+SERVERS=""
+LOGO=true
+IS_SUBSET=false
+SUBSET=""
+DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/airavata-daemon.out"
+LOG_FILE=$DEFAULT_LOG_FILE
+
+SERVICE_NAME="Directory Monitor"
+PID_PATH_NAME="${AIRAVATA_HOME}/bin/service-pid"
+
+case $1 in
+ start)
+ echo "Starting $SERVICE_NAME ..."
+ if [ ! -f $PID_PATH_NAME ]; then
+ nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
+ org.apache.airavata.datalake.dmonitor.DirectoryMonitor
${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+ echo $! > $PID_PATH_NAME
+ echo "$SERVICE_NAME started ..."
+ else
+ echo "$SERVICE_NAME is already running ..."
+ fi
+ ;;
+ stop)
+ if [ -f $PID_PATH_NAME ]; then
+ PID=$(cat $PID_PATH_NAME);
+ echo "$SERVICE_NAME stoping ..."
+ kill $PID;
+ RETRY=0
+ while kill -0 $PID 2> /dev/null; do
+ echo "Waiting for the process $PID to be stopped"
+ RETRY=`expr ${RETRY} + 1`
+ if [ "${RETRY}" -gt "20" ]
+ then
+ echo "Forcefully killing the process as it is not
responding ..."
+ kill -9 $PID
+ fi
+ sleep 1
+ done
+ echo "$SERVICE_NAME stopped ..."
+ rm $PID_PATH_NAME
+ else
+ echo "$SERVICE_NAME is not running ..."
+ fi
+ ;;
+ restart)
+ if [ -f $PID_PATH_NAME ]; then
+ PID=$(cat $PID_PATH_NAME);
+ echo "$SERVICE_NAME stopping ...";
+ kill $PID;
+ RETRY=0
+ while kill -0 $PID 2> /dev/null; do
+ echo "Waiting for the process $PID to be stopped"
+ RETRY=`expr ${RETRY} + 1`
+ if [ "${RETRY}" -gt "20" ]
+ then
+ echo "Forcefully killing the process as it is not
responding ..."
+ kill -9 $PID
+ fi
+ sleep 1
+ done
+ echo "$SERVICE_NAME stopped ...";
+ rm $PID_PATH_NAME
+ echo "$SERVICE_NAME starting ..."
+ nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
+ org.apache.airavata.datalake.dmonitor.DirectoryMonitor
${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
+ echo $! > $PID_PATH_NAME
+ echo "$SERVICE_NAME started ..."
+ else
+ echo "$SERVICE_NAME is not running ..."
+ fi
+ ;;
+ -h)
+ echo "Usage: dir-monitor-daemon.sh"
+
+ echo "command options:"
+ echo " start Start server in daemon mode"
+ echo " stop Stop server running in daemon mode"
+ echo " restart Restart server in daemon mode"
+ echo " -log <LOG_FILE> Where to redirect stdout/stderr
(defaults to $DEFAULT_LOG_FILE)"
+ echo " -h Display this help and exit"
+ shift
+ exit 0
+ ;;
+esac
+
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor.sh
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor.sh
new file mode 100644
index 0000000..902d6ea
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/dir-monitor.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+. `dirname $0`/setenv.sh
+# Capture user's working dir before changing directory
+CWD="$PWD"
+cd ${AIRAVATA_HOME}/bin
+LOGO_FILE="logo.txt"
+
+JAVA_OPTS="-Dspring.config.location=${AIRAVATA_HOME}/conf/
-Dairavata.home=${AIRAVATA_HOME}
-Dlogback.configurationFile=file:${AIRAVATA_HOME}/conf/logback.xml"
+AIRAVATA_COMMAND=""
+EXTRA_ARGS=""
+SERVERS=""
+IS_SUBSET=false
+SUBSET=""
+DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/console.out"
+LOG_FILE=$DEFAULT_LOG_FILE
+
+# parse command arguments
+for var in "$@"
+do
+ case ${var} in
+ -xdebug)
+ AIRAVATA_COMMAND="${AIRAVATA_COMMAND}"
+ JAVA_OPTS="$JAVA_OPTS -Xdebug -Xnoagent
-Xrunjdwp:transport=dt_socket,server=y,address=*:8000"
+ shift
+ ;;
+ -log)
+ shift
+ LOG_FILE="$1"
+ shift
+ # If relative path, expand to absolute path using the user's $CWD
+ if [ -z "`echo "$LOG_FILE" | egrep "^/"`" ]; then
+ LOG_FILE="${CWD}/${LOG_FILE}"
+ fi
+ ;;
+ -h)
+ echo "Usage: dir-monitor.sh"
+
+ echo "command options:"
+ echo " -xdebug Start Directory Monitor under JPDA
debugger"
+ echo " -h Display this help and exit"
+ shift
+ exit 0
+ ;;
+ *)
+ EXTRA_ARGS="${EXTRA_ARGS} ${var}"
+ shift
+ ;;
+ esac
+done
+
+java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
+ org.apache.airavata.datalake.dmonitor.DirectoryMonitor ${AIRAVATA_COMMAND}
$*
+
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/setenv.sh
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/setenv.sh
new file mode 100755
index 0000000..9e894e1
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/bin/setenv.sh
@@ -0,0 +1,46 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '.*/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRGDIR=`dirname "$PRG"`
+
+# Only set AIRAVATA_HOME if not already set
+[ -z "$AIRAVATA_HOME" ] && AIRAVATA_HOME=`cd "$PRGDIR/.." ; pwd`
+
+AIRAVATA_CLASSPATH=""
+
+for f in "$AIRAVATA_HOME"/lib/*.jar
+do
+ AIRAVATA_CLASSPATH="$AIRAVATA_CLASSPATH":$f
+done
+
+export AIRAVATA_HOME
+export AIRAVATA_CLASSPATH
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/application.properties
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/application.properties
new file mode 100644
index 0000000..67b7778
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/application.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+tenant.id=
+base.path=/images/GatewayTests/workdir
+monitor.depth=2
+resource.type=FOLDER
+host.name=
+event.type=MODIFY
+auth.token=
+kafka.url=
+kafka.publisher.id=evenpub
+kafka.event.topic=data-orchestrator-file-events
\ No newline at end of file
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/logback.xml
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/logback.xml
new file mode 100644
index 0000000..3afe661
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/conf/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="LOGFILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <File>../logs/airavata.log</File>
+ <Append>true</Append>
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+
<fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>1GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="ch.qos.logback" level="WARN"/>
+ <logger name="org.apache.helix" level="WARN"/>
+ <logger name="org.apache.zookeeper" level="ERROR"/>
+ <logger name="org.apache.helix" level="ERROR"/>
+ <logger name="org.apache.airavata" level="INFO"/>
+ <logger name="org.hibernate" level="ERROR"/>
+ <logger name="net.schmizz.sshj" level="WARN"/>
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="LOGFILE"/>
+ </root>
+</configuration>
\ No newline at end of file
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/dir-monitor-bin-assembly.xml
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/dir-monitor-bin-assembly.xml
new file mode 100644
index 0000000..743414b
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/dist/dir-monitor-bin-assembly.xml
@@ -0,0 +1,85 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!DOCTYPE assembly [
+ <!ELEMENT assembly
(id|includeBaseDirectory|baseDirectory|formats|fileSets|dependencySets)*>
+ <!ELEMENT id (#PCDATA)>
+ <!ELEMENT includeBaseDirectory (#PCDATA)>
+ <!ELEMENT baseDirectory (#PCDATA)>
+ <!ELEMENT formats (format)*>
+ <!ELEMENT format (#PCDATA)>
+ <!ELEMENT fileSets (fileSet)*>
+ <!ELEMENT fileSet (directory|outputDirectory|fileMode|includes)*>
+ <!ELEMENT directory (#PCDATA)>
+ <!ELEMENT outputDirectory (#PCDATA)>
+ <!ELEMENT includes (include)*>
+ <!ELEMENT include (#PCDATA)>
+ <!ELEMENT dependencySets (dependencySet)*>
+ <!ELEMENT dependencySet
(outputDirectory|outputFileNameMapping|includes)*>
+ ]>
+<assembly>
+
+ <id>bin</id>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <baseDirectory>Dir-Monitor-${project.version}</baseDirectory>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/dist/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>777</fileMode>
+ <includes>
+ <include>*.sh</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/dist/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>application.properties</include>
+ <include>logback.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>logs</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ </fileSet>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>false</useProjectArtifact>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>*</include>
+ </includes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/DirectoryMonitor.java
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/DirectoryMonitor.java
new file mode 100644
index 0000000..fe8a7cb
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/DirectoryMonitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.dmonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.*;
+import java.util.HashMap;
+import java.util.Map;
+
+@SpringBootApplication()
+public class DirectoryMonitor implements CommandLineRunner {
+
+ private final static Logger logger =
LoggerFactory.getLogger(DirectoryMonitor.class);
+
+ @org.springframework.beans.factory.annotation.Value("${tenant.id}")
+ private String tenantId;
+
+ @org.springframework.beans.factory.annotation.Value("${base.path}")
+ private String basePath;
+
+ @org.springframework.beans.factory.annotation.Value("${monitor.depth}")
+ private int monitorDepth;
+
+ @org.springframework.beans.factory.annotation.Value("${resource.type}")
+ private String resourceType;
+
+ @org.springframework.beans.factory.annotation.Value("${host.name}")
+ private String hostName;
+
+ @org.springframework.beans.factory.annotation.Value("${event.type}")
+ private String eventType;
+
+ @org.springframework.beans.factory.annotation.Value("${auth.token}")
+ private String authToken;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.url}")
+ private String kafkaUrl;
+
+
@org.springframework.beans.factory.annotation.Value("${kafka.publisher.id}")
+ private String kafkaPublisherId;
+
+ @org.springframework.beans.factory.annotation.Value("${kafka.event.topic}")
+ private String kafkaEventTopic;
+
+ private Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+
+ public static void main(String args[]) {
+ SpringApplication app = new SpringApplication(DirectoryMonitor.class);
+ app.setWebApplicationType(WebApplicationType.NONE);
+ app.run(args);
+ }
+
+ private void registerPathsRecursively(Path currentPath, int depth,
WatchService watchService) throws IOException {
+ File[] children = currentPath.toAbsolutePath().toFile().listFiles();
+
+ if (depth < monitorDepth) {
+ for (File child : children) {
+ if (child.isDirectory()) {
+ Path childPath = Paths.get(child.getAbsolutePath());
+ WatchKey watchKey = childPath.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
+ logger.info("Registering path {}",
childPath.toAbsolutePath().toString());
+ watchKeyPathMap.put(watchKey, childPath);
+ registerPathsRecursively(childPath, depth + 1,
watchService);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+
+ EventNotifier eventNotifier =
EventNotifier.EventNotifierBuilder.newBuilder()
+ .withTenantId(tenantId)
+ .withResourceType(resourceType)
+ .withHostName(hostName)
+ .withEventType(eventType)
+ .withAuthToken(authToken)
+ .withKafkaUrl(kafkaUrl)
+ .withKafkaPublisherId(kafkaPublisherId)
+ .withKafkaEventTopic(kafkaEventTopic).build();
+
+ SaturationGauge saturationGauge = new SaturationGauge();
+ saturationGauge.start(eventNotifier);
+
+ WatchService watchService = FileSystems.getDefault().newWatchService();
+ Path base = Paths.get(basePath);
+ watchKeyPathMap.put(base.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE), base);
+
+ registerPathsRecursively(base, 0, watchService);
+
+ boolean poll = true;
+
+ while (poll) {
+ WatchKey key = watchService.take();
+ Path parentPath = watchKeyPathMap.get(key);
+ for (WatchEvent<?> event : key.pollEvents()) {
+
+ Path eventPath = parentPath.resolve((Path) event.context());
+ int eventDepth = base.relativize(eventPath).getNameCount();
+
+ logger.info("Event path " +
eventPath.toAbsolutePath().toString() + ", Depth " + eventDepth);
+
+ if (eventDepth <= monitorDepth) {
+ logger.info("Registering new path {}",
eventPath.toAbsolutePath().toString());
+ WatchKey newKey = eventPath.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
+ watchKeyPathMap.put(newKey, eventPath);
+ }
+
+ if (eventDepth == monitorDepth + 1 &&
eventPath.toAbsolutePath().toFile().isDirectory()) {
+ logger.info("Detected a candidate directory {}",
eventPath.toAbsolutePath().toString());
+
saturationGauge.monitorSaturation(eventPath.toAbsolutePath().toFile());
+ }
+ //File file = path.resolve((Path) event.context()).toFile();
+ //logger.info("Event : " + event.kind() + " Type : " +
(file.isDirectory()? "Directory": "File") + " Name : " + event.context());
+ //if (file.isDirectory()) {
+ //saturationGauge.monitorSaturation(file);
+ //}
+ }
+ poll = key.reset();
+ }
+ }
+}
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/EventNotifier.java
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/EventNotifier.java
new file mode 100644
index 0000000..49e39be
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/EventNotifier.java
@@ -0,0 +1,100 @@
+package org.apache.airavata.datalake.dmonitor;
+
+import
org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
+import
org.apache.airavata.dataorchestrator.messaging.publisher.MessageProducer;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+public class EventNotifier {
+ private String tenantId;
+ private String resourceType;
+ private String hostName;
+ private String eventType;
+ private String authToken;
+ private String kafkaEventTopic;
+ private MessageProducer kafkaProducer;
+
+ public void notify(String resourcePath) throws Exception {
+ kafkaProducer.publish(kafkaEventTopic, Notification.newBuilder()
+ .setTenantId(tenantId)
+
.setBasePath(Paths.get(resourcePath).getParent().toAbsolutePath()
+ .getParent().toAbsolutePath().toString() +
File.separator)
+ .setResourceType(resourceType)
+ .setHostName(hostName)
+ .setEventType(Notification.NotificationType.valueOf(eventType))
+ .setAuthToken(authToken)
+ .setResourcePath(resourcePath).build(), (metadata, exception)
-> {});
+ }
+
+
+ public static final class EventNotifierBuilder {
+ private String tenantId;
+ private String resourceType;
+ private String hostName;
+ private String eventType;
+ private String authToken;
+ private String kafkaUrl;
+ private String kafkaPublisherId;
+ private String kafkaEventTopic;
+
+ private EventNotifierBuilder() {
+ }
+
+ public static EventNotifierBuilder newBuilder() {
+ return new EventNotifierBuilder();
+ }
+
+ public EventNotifierBuilder withTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ return this;
+ }
+
+ public EventNotifierBuilder withResourceType(String resourceType) {
+ this.resourceType = resourceType;
+ return this;
+ }
+
+ public EventNotifierBuilder withHostName(String hostName) {
+ this.hostName = hostName;
+ return this;
+ }
+
+ public EventNotifierBuilder withEventType(String eventType) {
+ this.eventType = eventType;
+ return this;
+ }
+
+ public EventNotifierBuilder withAuthToken(String authToken) {
+ this.authToken = authToken;
+ return this;
+ }
+
+ public EventNotifierBuilder withKafkaUrl(String kafkaUrl) {
+ this.kafkaUrl = kafkaUrl;
+ return this;
+ }
+
+ public EventNotifierBuilder withKafkaPublisherId(String
kafkaPublisherId) {
+ this.kafkaPublisherId = kafkaPublisherId;
+ return this;
+ }
+
+ public EventNotifierBuilder withKafkaEventTopic(String
kakfaEventTopic) {
+ this.kafkaEventTopic = kakfaEventTopic;
+ return this;
+ }
+
+ public EventNotifier build() {
+ EventNotifier eventNotifier = new EventNotifier();
+ eventNotifier.resourceType = this.resourceType;
+ eventNotifier.authToken = this.authToken;
+ eventNotifier.hostName = this.hostName;
+ eventNotifier.kafkaEventTopic = this.kafkaEventTopic;
+ eventNotifier.eventType = this.eventType;
+ eventNotifier.tenantId = this.tenantId;
+ eventNotifier.kafkaProducer = new MessageProducer(kafkaUrl,
kafkaPublisherId);
+ return eventNotifier;
+ }
+ }
+}
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/SaturationGauge.java
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/SaturationGauge.java
new file mode 100644
index 0000000..b409d17
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/java/org/apache/airavata/datalake/dmonitor/SaturationGauge.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.dmonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+public class SaturationGauge {
+
+ private final static Logger logger =
LoggerFactory.getLogger(SaturationGauge.class);
+
+ public Map<String, Long> directorySizes = new ConcurrentHashMap<>();
+ public Map<String, Integer> monitorCount = new ConcurrentHashMap<>();
+ final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ final ExecutorService monitoringService = Executors.newFixedThreadPool(10);
+
+ public void start(EventNotifier eventNotifier) {
+ scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ List<Future<Boolean>> submitFutures = new ArrayList<>();
+ for (String key : directorySizes.keySet()) {
+ Future<Boolean> submitFuture = monitoringService.submit(() -> {
+
+ monitorCount.put(key, monitorCount.get(key) + 1);
+
+ try {
+ long oldSize = directorySizes.get(key);
+ long newSize = getFolderSize(new File(key));
+ directorySizes.put(key, newSize);
+ logger.info("Directory : " + key + " Size : " +
newSize + " Scan count : " + monitorCount.get(key));
+
+ if (oldSize == newSize && monitorCount.get(key) > 3) {
+ logger.info("Directory " + key + " is saturated.
Final size " + oldSize);
+ monitorCount.remove(key);
+ directorySizes.remove(key);
+ eventNotifier.notify(key);
+ }
+
+ if (oldSize != newSize) {
+ monitorCount.put(key, 0);
+ }
+
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (monitorCount.get(key) > 3) {
+ monitorCount.remove(key);
+ directorySizes.remove(key);
+ }
+ return false;
+ }
+ });
+
+ submitFutures.add(submitFuture);
+ }
+
+ for (Future<Boolean> submitFuture: submitFutures) {
+ try {
+ submitFuture.get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ logger.debug("All monitor threads were completed");
+ }, 1, 20, TimeUnit.SECONDS);
+ }
+
+ public void monitorSaturation(File folder) {
+ if (! directorySizes.containsKey(folder.getAbsolutePath())) {
+ directorySizes.put(folder.getAbsolutePath(), 0L);
+ monitorCount.put(folder.getAbsolutePath(), 0);
+ }
+ }
+
+ private long getFolderSize(File folder) {
+ long length = 0;
+ File[] files = folder.listFiles();
+
+ int count = files.length;
+
+ for (int i = 0; i < count; i++) {
+ if (files[i].isFile()) {
+ length += files[i].length();
+ }
+ else {
+ length += getFolderSize(files[i]);
+ }
+ }
+ return length;
+ }
+}
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/application.properties
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/application.properties
new file mode 100644
index 0000000..bbc8097
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/application.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+tenant.id=
+base.path=/tmp/airavata/
+monitor.depth=2
+resource.type=FOLDER
+host.name=
+event.type=MODIFY
+auth.token=
+kafka.url=
+kafka.publisher.id=evenpub
+kafka.event.topic=data-orchestrator-file-events
\ No newline at end of file
diff --git
a/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/logback.xml
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/logback.xml
new file mode 100644
index 0000000..3afe661
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-clients/directory-monitor/src/main/resources/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="LOGFILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <File>../logs/airavata.log</File>
+ <Append>true</Append>
+ <encoder>
+ <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+
<fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>1GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="ch.qos.logback" level="WARN"/>
+ <logger name="org.apache.helix" level="WARN"/>
+ <logger name="org.apache.zookeeper" level="ERROR"/>
+ <logger name="org.apache.helix" level="ERROR"/>
+ <logger name="org.apache.airavata" level="INFO"/>
+ <logger name="org.hibernate" level="ERROR"/>
+ <logger name="net.schmizz.sshj" level="WARN"/>
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="LOGFILE"/>
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/data-orchestrator/data-orchestrator-clients/pom.xml
b/data-orchestrator/data-orchestrator-clients/pom.xml
index ab1dbb0..6f0ee3f 100644
--- a/data-orchestrator/data-orchestrator-clients/pom.xml
+++ b/data-orchestrator/data-orchestrator-clients/pom.xml
@@ -1,4 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -11,6 +32,7 @@
<packaging>pom</packaging>
<modules>
<module>data-orchestrator-clients-core</module>
+ <module>directory-monitor</module>
</modules>
<modelVersion>4.0.0</modelVersion>
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 6246d85..9bd855d 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -179,9 +179,15 @@ public class OrchestratorEventProcessor implements
Runnable {
notification.getResourceType());
logger.error("Resource should be a Folder type");
}
+
String removeBasePath =
notification.getResourcePath().substring(notification.getBasePath().length());
String[] splitted = removeBasePath.split("/");
+ if (splitted.length < 2) {
+ logger.error("Invalid path. Need at least two folder levels
from base. {}", removeBasePath);
+ throw new Exception("Invalid path. Need at least two folder
levels from base");
+ }
+
String adminUser = splitted[0];
String owner = splitted[1].split("_")[0];
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
index 55a1166..2a019fb 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
@@ -43,12 +43,13 @@ message Notification {
}
message ReplicaLocation {
- string baseDirectoryResourceId = 1;
+ string storageId = 1;
enum ReplicaMode {
ARCHIVE = 0;
ORIGINAL = 1;
}
ReplicaMode replicaMode = 2;
+ string resourcePath = 3;
}
message NotificationStatus {
diff --git a/pom.xml b/pom.xml
index 24676ba..cba4a14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,8 +59,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
- <source>11</source>
- <target>11</target>
+ <source>1.8</source>
+ <target>1.8</target>
<fork>true</fork>
</configuration>
</plugin>