Implemented gfac worker
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2654de09 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2654de09 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2654de09 Branch: refs/heads/master Commit: 2654de094e76a937643b2ea3ca9fff5efa88c50e Parents: 8535ff1 Author: Shameera Rathanyaka <[email protected]> Authored: Fri Jun 12 14:40:34 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Fri Jun 12 14:40:34 2015 -0400 ---------------------------------------------------------------------- .../airavata/common/logger/AiravataLogger.java | 677 ------------------- .../common/logger/AiravataLoggerFactory.java | 34 - .../common/logger/AiravataLoggerImpl.java | 323 --------- .../airavata/common/utils/ServerSettings.java | 22 + .../main/resources/airavata-server.properties | 4 +- .../org/apache/airavata/gfac/core/GFac.java | 36 +- .../gfac/core/GFacThreadPoolExecutor.java | 6 +- .../apache/airavata/gfac/core/GFacUtils.java | 5 + .../apache/airavata/gfac/core/GFacWorker.java | 37 - .../airavata/gfac/impl/BetterGfacImpl.java | 1 + .../apache/airavata/gfac/impl/GFacEngine.java | 53 ++ .../org/apache/airavata/gfac/impl/GFacImpl.java | 28 - .../apache/airavata/gfac/impl/GFacWorker.java | 87 +++ .../airavata/gfac/server/GfacServerHandler.java | 95 ++- 14 files changed, 234 insertions(+), 1174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java b/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java deleted file mode 100644 index 9477cc9..0000000 --- a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLogger.java +++ /dev/null @@ -1,677 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.common.logger; - -public interface AiravataLogger{ - - /** - * Return the name of this <code>Logger</code> instance. - * - * @return name of this logger instance - */ - String getName(); - - /** - * Is the logger instance enabled for the TRACE level? - * - * @return True if this Logger is enabled for the TRACE level, - * false otherwise. - * @since 1.4 - */ - boolean isTraceEnabled(); - - /** - * Log a message at the TRACE level. - * - * @param msg the message string to be logged - * @since 1.4 - */ - void trace(String msg); - - /** - * Log a message at the TRACE level. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message string to be logged - * @since 1.4 - */ - void traceId(String etjId, String msg); - - /** - * Log a message at the TRACE level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the TRACE level. </p> - * - * @param format the format string - * @param arg the argument - * @since 1.4 - */ - void trace(String format, Object arg); - - /** - * Log a message at the TRACE level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the TRACE level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg the argument - * @since 1.4 - */ - void traceId(String etjId, String format, Object arg); - - /** - * Log a message at the TRACE level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the TRACE level. </p> - * - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - * @since 1.4 - */ - void trace(String format, Object arg1, Object arg2); - - /** - * Log a message at the TRACE level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the TRACE level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - * @since 1.4 - */ - void traceId(String etjId, String format, Object arg1, Object arg2); - - /** - * Log a message at the TRACE level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the TRACE level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for TRACE. The variants taking {@link #trace(String, Object) one} and - * {@link #trace(String, Object, Object) two} arguments exist solely in order to avoid this hidden cost.</p> - * - * @param format the format string - * @param arguments a list of 3 or more arguments - * @since 1.4 - */ - void trace(String format, Object... arguments); - - /** - * Log a message at the TRACE level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the TRACE level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for TRACE. The variants taking {@link #trace(String, Object) one} and - * {@link #trace(String, Object, Object) two} arguments exist solely in order to avoid this hidden cost.</p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arguments a list of 3 or more arguments - * @since 1.4 - */ - void traceId(String etjId, String format, Object... arguments); - - /** - * Log an exception (throwable) at the TRACE level with an - * accompanying message. - * - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - * @since 1.4 - */ - void trace(String msg, Throwable t); - - /** - * Log an exception (throwable) at the TRACE level with an - * accompanying message. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - * @since 1.4 - */ - void traceId(String etjId, String msg, Throwable t); - - /** - * Is the logger instance enabled for the DEBUG level? - * - * @return True if this Logger is enabled for the DEBUG level, - * false otherwise. - */ - boolean isDebugEnabled(); - - /** - * Log a message at the DEBUG level. - * - * @param msg the message string to be logged - */ - void debug(String msg); - - /** - * Log a message at the DEBUG level. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message string to be logged - */ - void debugId(String etjId, String msg); - - /** - * Log a message at the DEBUG level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the DEBUG level. </p> - * - * @param format the format string - * @param arg the argument - */ - void debug(String format, Object arg); - - /** - * Log a message at the DEBUG level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the DEBUG level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg the argument - */ - void debugId(String etjId, String format, Object arg); - - /** - * Log a message at the DEBUG level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the DEBUG level. </p> - * - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void debug(String format, Object arg1, Object arg2); - - /** - * Log a message at the DEBUG level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the DEBUG level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void debugId(String etjId, String format, Object arg1, Object arg2); - - /** - * Log a message at the DEBUG level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the DEBUG level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for DEBUG. The variants taking - * {@link #debug(String, Object) one} and {@link #debug(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void debug(String format, Object... arguments); - - /** - * Log a message at the DEBUG level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the DEBUG level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for DEBUG. The variants taking - * {@link #debug(String, Object) one} and {@link #debug(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void debugId(String etjId, String format, Object... arguments); - - /** - * Log an exception (throwable) at the DEBUG level with an - * accompanying message. - * - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void debug(String msg, Throwable t); - - /** - * Log an exception (throwable) at the DEBUG level with an - * accompanying message. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void debugId(String etjId, String msg, Throwable t); - - /** - * Is the logger instance enabled for the INFO level? - * - * @return True if this Logger is enabled for the INFO level, - * false otherwise. - */ - boolean isInfoEnabled(); - - /** - * Log a message at the INFO level. - * - * @param msg the message string to be logged - */ - void info(String msg); - - /** - * Log a message at the INFO level. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message string to be logged - */ - void infoId(String etjId, String msg); - - /** - * Log a message at the INFO level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the INFO level. </p> - * - * @param format the format string - * @param arg the argument - */ - void info(String format, Object arg); - - /** - * Log a message at the INFO level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the INFO level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg the argument - */ - void infoId(String etjId, String format, Object arg); - - /** - * Log a message at the INFO level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the INFO level. </p> - * - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void info(String format, Object arg1, Object arg2); - - /** - * Log a message at the INFO level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the INFO level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void infoId(String etjId, String format, Object arg1, Object arg2); - - /** - * Log a message at the INFO level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the INFO level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for INFO. The variants taking - * {@link #info(String, Object) one} and {@link #info(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void info(String format, Object... arguments); - - /** - * Log a message at the INFO level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the INFO level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for INFO. The variants taking - * {@link #info(String, Object) one} and {@link #info(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void infoId(String etjId, String format, Object... arguments); - - /** - * Log an exception (throwable) at the INFO level with an - * accompanying message. - * - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void info(String msg, Throwable t); - - /** - * Log an exception (throwable) at the INFO level with an - * accompanying message. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void infoId(String etjId, String msg, Throwable t); - - /** - * Is the logger instance enabled for the WARN level? - * - * @return True if this Logger is enabled for the WARN level, - * false otherwise. - */ - boolean isWarnEnabled(); - - /** - * Log a message at the WARN level. - * - * @param msg the message string to be logged - */ - void warn(String msg); - - /** - * Log a message at the WARN level. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message string to be logged - */ - void warnId(String etjId, String msg); - - /** - * Log a message at the WARN level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the WARN level. </p> - * - * @param format the format string - * @param arg the argument - */ - void warn(String format, Object arg); - - /** - * Log a message at the WARN level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the WARN level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg the argument - */ - void warnId(String etjId, String format, Object arg); - - /** - * Log a message at the WARN level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the WARN level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for WARN. The variants taking - * {@link #warn(String, Object) one} and {@link #warn(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void warn(String format, Object... arguments); - - /** - * Log a message at the WARN level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the WARN level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for WARN. The variants taking - * {@link #warn(String, Object) one} and {@link #warn(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void warnId(String etjId, String format, Object... arguments); - - /** - * Log a message at the WARN level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the WARN level. </p> - * - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void warn(String format, Object arg1, Object arg2); - - /** - * Log a message at the WARN level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the WARN level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void warnId(String etjId, String format, Object arg1, Object arg2); - - /** - * Log an exception (throwable) at the WARN level with an - * accompanying message. - * - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void warn(String msg, Throwable t); - - /** - * Log an exception (throwable) at the WARN level with an - * accompanying message. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void warnId(String etjId, String msg, Throwable t); - - /** - * Is the logger instance enabled for the ERROR level? - * - * @return True if this Logger is enabled for the ERROR level, - * false otherwise. - */ - boolean isErrorEnabled(); - - /** - * Log a message at the ERROR level. - * - * @param msg the message string to be logged - */ - void error(String msg); - - /** - * Log a message at the ERROR level. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message string to be logged - */ - void errorId(String etjId, String msg); - - /** - * Log a message at the ERROR level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the ERROR level. </p> - * - * @param format the format string - * @param arg the argument - */ - void error(String format, Object arg); - - /** - * Log a message at the ERROR level according to the specified format - * and argument. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the ERROR level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg the argument - */ - void errorId(String etjId, String format, Object arg); - - /** - * Log a message at the ERROR level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the ERROR level. </p> - * - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void error(String format, Object arg1, Object arg2); - - /** - * Log a message at the ERROR level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous object creation when the logger - * is disabled for the ERROR level. </p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arg1 the first argument - * @param arg2 the second argument - */ - void errorId(String etjId, String format, Object arg1, Object arg2); - - /** - * Log a message at the ERROR level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the ERROR level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for ERROR. The variants taking - * {@link #error(String, Object) one} and {@link #error(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void error(String format, Object... arguments); - - /** - * Log a message at the ERROR level according to the specified format - * and arguments. - * <p/> - * <p>This form avoids superfluous string concatenation when the logger - * is disabled for the ERROR level. However, this variant incurs the hidden - * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, - * even if this logger is disabled for ERROR. The variants taking - * {@link #error(String, Object) one} and {@link #error(String, Object, Object) two} - * arguments exist solely in order to avoid this hidden cost.</p> - * - * @param etjId - Experiment , Task or Job Id - * @param format the format string - * @param arguments a list of 3 or more arguments - */ - void errorId(String etjId, String format, Object... arguments); - - /** - * Log an exception (throwable) at the ERROR level with an - * accompanying message. - * - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void error(String msg, Throwable t); - - /** - * Log an exception (throwable) at the ERROR level with an - * accompanying message. - * - * @param etjId - Experiment , Task or Job Id - * @param msg the message accompanying the exception - * @param t the exception (throwable) to log - */ - void errorId(String etjId, String msg, Throwable t); - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java b/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java deleted file mode 100644 index a1a9462..0000000 --- a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.common.logger; - -public class AiravataLoggerFactory { - - public static AiravataLogger getLogger(Class aClass) { - return new AiravataLoggerImpl(aClass); - } - - public static AiravataLogger getLogger(String className) { - return new AiravataLoggerImpl(className); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java b/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java deleted file mode 100644 index 74ab401..0000000 --- a/modules/commons/src/main/java/org/apache/airavata/common/logger/AiravataLoggerImpl.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.common.logger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AiravataLoggerImpl implements AiravataLogger{ - - private Logger logger; - - public AiravataLoggerImpl(Class aClass) { - logger = LoggerFactory.getLogger(aClass); - } - - public AiravataLoggerImpl(String className) { - logger = LoggerFactory.getLogger(className); - } - - - @Override - public String getName() { - return logger.getName(); - } - - @Override - public boolean isTraceEnabled() { - return logger.isTraceEnabled(); - } - - @Override - public void trace(String msg) { - logger.trace(msg); - } - - @Override - public void traceId(String etjId, String msg) { - logger.trace(getAiravataLogMessage(etjId, msg)); - } - - @Override - public void trace(String format, Object arg) { - logger.trace(format, arg); - } - - @Override - public void traceId(String etjId, String format, Object arg) { - logger.trace(getAiravataLogMessage(etjId, format), arg); - } - - @Override - public void trace(String format, Object arg1, Object arg2) { - logger.trace(format, arg1, arg2); - } - - @Override - public void traceId(String etjId, String format, Object arg1, Object arg2) { - logger.trace(getAiravataLogMessage(etjId,format), arg1, arg2); - } - - @Override - public void trace(String format, Object... arguments) { - logger.trace(format, arguments); - } - - @Override - public void traceId(String etjId, String format, Object... arguments) { - logger.trace(getAiravataLogMessage(etjId, format), arguments); - } - - @Override - public void trace(String msg, Throwable t) { - logger.trace(msg, t); - } - - @Override - public void traceId(String etjId, String msg, Throwable t) { - logger.trace(getAiravataLogMessage(etjId, msg), t); - } - - @Override - public boolean isDebugEnabled() { - return logger.isDebugEnabled(); - } - - @Override - public void debug(String msg) { - logger.debug(msg); - } - - @Override - public void debugId(String etjId, String msg) { - logger.debug(getAiravataLogMessage(etjId, msg)); - } - - @Override - public void debug(String format, Object arg) { - logger.debug(format, arg); - } - - @Override - public void debugId(String etjId, String format, Object arg) { - logger.debug(getAiravataLogMessage(etjId, format), arg); - } - - @Override - public void debug(String format, Object arg1, Object arg2) { - logger.debug(format, arg1, arg2); - } - - @Override - public void debugId(String etjId, String format, Object arg1, Object arg2) { - logger.debug(getAiravataLogMessage(etjId, format), arg1, arg2); - } - - @Override - public void debug(String format, Object... arguments) { - logger.debug(format, arguments); - } - - @Override - public void debugId(String etjId, String format, Object... arguments) { - logger.debug(getAiravataLogMessage(etjId, format), arguments); - } - - @Override - public void debug(String msg, Throwable t) { - logger.debug(msg, t); - } - - @Override - public void debugId(String etjId, String msg, Throwable t) { - logger.debug(getAiravataLogMessage(etjId, msg), t); - } - - @Override - public boolean isInfoEnabled() { - return logger.isInfoEnabled(); - } - - @Override - public void info(String msg) { - logger.info(msg); - } - - @Override - public void infoId(String etjId, String msg) { - logger.info(getAiravataLogMessage(etjId, msg)); - } - - @Override - public void info(String format, Object arg) { - logger.info(format, arg); - } - - @Override - public void infoId(String etjId, String format, Object arg) { - logger.info(getAiravataLogMessage(etjId, format), arg); - } - - @Override - public void info(String format, Object arg1, Object arg2) { - logger.info(format, arg1, arg2); - } - - @Override - public void infoId(String etjId, String format, Object arg1, Object arg2) { - logger.info(getAiravataLogMessage(etjId, format), arg1, arg2); - } - - @Override - public void info(String format, Object... arguments) { - logger.info(format, arguments); - } - - @Override - public void infoId(String etjId, String format, Object... arguments) { - logger.info(getAiravataLogMessage(etjId, format), arguments); - } - - @Override - public void info(String msg, Throwable t) { - logger.info(msg, t); - } - - @Override - public void infoId(String etjId, String msg, Throwable t) { - logger.info(getAiravataLogMessage(etjId, msg), t); - } - - @Override - public boolean isWarnEnabled() { - return logger.isWarnEnabled(); - } - - @Override - public void warn(String msg) { - logger.warn(msg); - } - - @Override - public void warnId(String etjId, String msg) { - logger.warn(getAiravataLogMessage(etjId, msg)); - } - - @Override - public void warn(String format, Object arg) { - logger.warn(format, arg); - } - - @Override - public void warnId(String etjId, String format, Object arg) { - logger.warn(getAiravataLogMessage(etjId, format), arg); - } - - @Override - public void warn(String format, Object... arguments) { - logger.warn(format, arguments); - } - - @Override - public void warnId(String etjId, String format, Object... arguments) { - logger.warn(getAiravataLogMessage(etjId, format), arguments); - } - - @Override - public void warn(String format, Object arg1, Object arg2) { - logger.warn(format, arg1, arg2); - } - - @Override - public void warnId(String etjId, String format, Object arg1, Object arg2) { - logger.warn(getAiravataLogMessage(etjId, format), arg1, arg2); - } - - @Override - public void warn(String msg, Throwable t) { - logger.warn(msg, t); - } - - @Override - public void warnId(String etjId, String msg, Throwable t) { - logger.warn(getAiravataLogMessage(etjId, msg), t); - } - - @Override - public boolean isErrorEnabled() { - return logger.isErrorEnabled(); - } - - @Override - public void error(String msg) { - logger.error(msg); - } - - @Override - public void errorId(String etjId, String msg) { - logger.error(getAiravataLogMessage(etjId, msg)); - } - - @Override - public void error(String format, Object arg) { - logger.error(format, arg); - } - - @Override - public void errorId(String etjId, String format, Object arg) { - logger.error(getAiravataLogMessage(etjId, format), arg); - } - - @Override - public void error(String format, Object arg1, Object arg2) { - logger.error(format, arg1, arg2); - } - - @Override - public void errorId(String etjId, String format, Object arg1, Object arg2) { - logger.error(getAiravataLogMessage(etjId, format), arg1, arg2); - } - - @Override - public void error(String format, Object... arguments) { - logger.error(format, arguments); - } - - @Override - public void errorId(String etjId, String format, Object... arguments) { - logger.error(getAiravataLogMessage(etjId, format), arguments); - } - - @Override - public void error(String msg, Throwable t) { - logger.error(msg, t); - } - - @Override - public void errorId(String etjId, String msg, Throwable t) { - logger.error(getAiravataLogMessage(etjId, msg), t); - } - - private String getAiravataLogMessage(String etjId, String msg) { - return new StringBuilder("Id:").append(etjId).append(" : ").append(msg).toString(); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 3f312fd..adcde31 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -25,9 +25,13 @@ import java.net.InetAddress; import java.net.UnknownHostException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ServerSettings extends ApplicationSettings { + private static final Logger log = LoggerFactory.getLogger(ServerSettings.class); + private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_USER_PASSWORD = "default.registry.password"; private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway"; @@ -42,6 +46,8 @@ public class ServerSettings extends ApplicationSettings { public static final String GFAC_SERVER_HOST = "gfac.server.host"; public static final String GFAC_SERVER_PORT = "gfac.server.port"; public static final String GFAC_SERVER_NAME = "gfac.server.name"; + public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size"; + public static final int DEFAULT_GFAC_THREAD_POOL_SIZE = 50; public static final String GFAC_CONFIG_XML = "gfac-config.xml"; // Credential Store constants public static final String CREDENTIAL_SERVER_HOST = "credential.store.server.host"; @@ -294,4 +300,20 @@ public class ServerSettings extends ApplicationSettings { return getSetting(GFAC_SERVER_PORT); } + public static int getGFacThreadPoolSize() { + try { + String threadPoolSize = getSetting(GFAC_THREAD_POOL_SIZE); + if (threadPoolSize != null && !threadPoolSize.isEmpty()) { + return Integer.valueOf(threadPoolSize); + } else { + log.warn("Thread pool size is not configured, use default gfac thread pool size " + + DEFAULT_GFAC_THREAD_POOL_SIZE); + } + } catch (ApplicationSettingsException e) { + log.warn("Couldn't read thread pool size from configuration on exception, use default gfac thread pool " + + "size " + DEFAULT_GFAC_THREAD_POOL_SIZE); + } + return DEFAULT_GFAC_THREAD_POOL_SIZE; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 9d20609..1e52a16 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -82,7 +82,6 @@ apiserver.host=localhost apiserver.port=8930 apiserver.min.threads=50 - ########################################################################### # Orchestrator Server Configurations ########################################################################### @@ -108,6 +107,7 @@ enable.validation=true gfac.server.name=gfac-node0 gfac.server.host=localhost gfac.server.port=8950 +gfac.thread.pool.size=50 host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler @@ -170,8 +170,6 @@ [email protected] # Security Configuration used by Airavata Generic Factory Service # to interact with Computational Resources. # -gfac.thread.pool.size=50 -airavata.server.thread.pool.size=50 gfac=org.apache.airavata.gfac.server.GfacServer myproxy.server=myproxy.teragrid.org myproxy.username=ogce http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java index fca4c98..ca3f5b2 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java @@ -20,9 +20,8 @@ */ package org.apache.airavata.gfac.core; -import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.common.utils.LocalEventPublisher; +import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.registry.cpi.ExperimentCatalog; import org.apache.curator.framework.CuratorFramework; @@ -34,33 +33,44 @@ import org.apache.curator.framework.CuratorFramework; public interface GFac { /** - * Launching a process, this method run process inflow task and job submission task. + * Initialized method, this method must call one time before use any other method. + * @param experimentCatalog + * @param appCatalog + * @param curatorClient + * @param publisher + * @return + */ + public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher); + + /** + * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers + * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry * - * @param processContext + * @param experimentID * @return boolean Successful acceptence of the jobExecution returns a true value * @throws GFacException */ - public boolean submitProcess(ProcessContext processContext) throws GFacException; + public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException; /** - * This will invoke outflow tasks for a given process. - * @param processContext + * This method can be used in a handler to ivvoke outhandler asynchronously + * @param jobExecutionContext * @throws GFacException */ - public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException; + public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; /** - * This will reInvoke outflow tasks for a given process. - * @param processContext + * This method can be used to handle re-run case asynchronously + * @param jobExecutionContext * @throws GFacException */ - public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException; + public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; /** - * This operation can be used to cancel an already running process. + * This operation can be used to cancel an already running experiment * @return Successful cancellation will return true * @throws GFacException */ - public boolean cancelProcess(ProcessContext processContext)throws GFacException; + public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java index 9ae8c99..5bf09bf 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java @@ -21,15 +21,15 @@ package org.apache.airavata.gfac.core; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.logger.AiravataLogger; -import org.apache.airavata.common.logger.AiravataLoggerFactory; import org.apache.airavata.common.utils.ServerSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class GFacThreadPoolExecutor { - private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GFacThreadPoolExecutor.class); + private final static Logger logger = LoggerFactory.getLogger(GFacThreadPoolExecutor.class); public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size"; private static ExecutorService threadPool; http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 5a6d51d..294c3a9 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.gfac.core; +import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -100,6 +101,10 @@ public class GFacUtils { private GFacUtils() { } + public static ProcessContext populateProcessContext(ProcessContext processContext) { + return processContext; + } + /** * Read data from inputStream and convert it to String. * http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java deleted file mode 100644 index 2219f3a..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.gfac.core; - -import org.apache.airavata.gfac.core.context.ProcessContext; - -public class GFacWorker implements Runnable { - - - public GFacWorker(ProcessContext processContext) { - - } - - @Override - public void run() { - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java index a78d3f0..23b5cca 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java @@ -117,6 +117,7 @@ public class BetterGfacImpl implements GFac { return gfacInstance; } + @Override public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher) { this.experimentCatalog = experimentCatalog; http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java new file mode 100644 index 0000000..cdb22f6 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngine.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ + +package org.apache.airavata.gfac.impl; + +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.context.ProcessContext; + +public class GFacEngine { + + public static void createTaskChain(ProcessContext processContext) throws GFacException { + + } + + public static void executeProcess(ProcessContext processContext) throws GFacException { + + + } + + public static void recoverProcess(ProcessContext processContext) throws GFacException { + + } + + public static void runProcessOutflow(ProcessContext processContext) throws GFacException { + + } + + public static void recoverProcessOutflow(ProcessContext processContext) throws GFacException { + + } + + public static void cancelProcess() throws GFacException { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java deleted file mode 100644 index 827ab55..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.airavata.gfac.impl; - -import org.apache.airavata.gfac.core.GFac; -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.context.ProcessContext; - -public class GFacImpl implements GFac { - - @Override - public boolean submitProcess(ProcessContext processContext) throws GFacException { - return false; - } - - @Override - public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException { - - } - - @Override - public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException { - - } - - @Override - public boolean cancelProcess(ProcessContext processContext) throws GFacException { - return false; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java new file mode 100644 index 0000000..d8aa094 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GFacWorker implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(GFacWorker.class); + private final ProcessContext processContext; + + public GFacWorker(ProcessContext processContext) throws AiravataException { + if (processContext == null) { + throw new AiravataException("Worker must initialize with valide processContext, Process context is null"); + } + this.processContext = processContext; + } + + @Override + public void run() { + ProcessType type = getProcessType(processContext); + try { + switch (type){ + case NEW: + GFacUtils.populateProcessContext(processContext); + GFacEngine.createTaskChain(processContext); + GFacEngine.executeProcess(processContext); + break; + case RECOVER: + // recover the process + GFacEngine.recoverProcess(processContext); + break; + case OUTFLOW: + // run the outflow task + GFacEngine.runProcessOutflow(processContext); + break; + case RECOVER_OUTFLOW: + // recover outflow task; + GFacEngine.recoverProcessOutflow(processContext); + } + } catch (GFacException e) { + switch (type) { + case NEW: log.error("Process execution error", e); break; + case RECOVER: log.error("Process recover error ", e); break; + case OUTFLOW: log.error("Process outflow execution error",e); break; + case RECOVER_OUTFLOW: log.error("Process outflow recover error",e); break; + } + } + } + + private ProcessType getProcessType(ProcessContext processContext) { + // check the status and return correct type of process. + return ProcessType.NEW; + } + + + private enum ProcessType { + NEW, + RECOVER, + OUTFLOW, + RECOVER_OUTFLOW + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/2654de09/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 382cd5c..ec474d8 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -24,15 +24,14 @@ import com.google.common.eventbus.EventBus; import org.apache.airavata.common.exception.AiravataStartupException; import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.GFacConstants; -import org.apache.airavata.gfac.core.GFacWorker; +import org.apache.airavata.gfac.impl.GFacWorker; import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.logger.AiravataLogger; -import org.apache.airavata.common.logger.AiravataLoggerFactory; +import org.apache.airavata.common.log.AiravataLogger; +import org.apache.airavata.common.log.AiravataLoggerFactory; import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.listener.AbstractActivityListener; @@ -45,7 +44,6 @@ import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; import org.apache.airavata.gfac.impl.BetterGfacImpl; -import org.apache.airavata.gfac.impl.InputHandlerWorker; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; @@ -69,6 +67,8 @@ import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; @@ -77,9 +77,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class GfacServerHandler implements GfacService.Iface { - private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class); + private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class); private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; private static int requestCount=0; private ExperimentCatalog experimentCatalog; @@ -92,13 +94,15 @@ public class GfacServerHandler implements GfacService.Iface { private static File gfacConfigFile; private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>(); private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>(); + private ExecutorService executorService; public GfacServerHandler() throws AiravataStartupException { try { startCuratorClient(); initZkDataStructure(); initAMQPClient(); - localEventPublisher = new LocalEventPublisher(new EventBus()); + executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize()); + localEventPublisher = new LocalEventPublisher(new EventBus()); experimentCatalog = RegistryFactory.getDefaultExpCatalog(); appCatalog = RegistryFactory.getAppCatalog(); startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer); @@ -126,40 +130,23 @@ public class GfacServerHandler implements GfacService.Iface { * - /gfac-node0 (localhost:2181) *|/experiments */ - airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort(); + airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort(); // create PERSISTENT nodes ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath()); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE); // create EPHEMERAL server name node String gfacName = ServerSettings.getGFacServerName(); - if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)) == null) { + if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? + gfacName : "/" + gfacName)) == null) { curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) - .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)); + .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + + gfacName)); } curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName), new String(airavataServerHostPort).getBytes()); } - public static void main(String[] args) { - RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null; - try { - rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); - rabbitMQTaskLaunchConsumer.listen(new TestHandler()); - } catch (AiravataException e) { - logger.error(e.getMessage(), e); - } - } - - private long ByateArrayToLong(byte[] data) { - long value = 0; - for (int i = 0; i < data.length; i++) - { - value += ((long) data[i] & 0xffL) << (8 * i); - } - return value; - } - public String getGFACServiceVersion() throws TException { return gfac_cpi_serviceConstants.GFAC_CPI_VERSION; } @@ -181,42 +168,38 @@ public class GfacServerHandler implements GfacService.Iface { * @param processId - processModel id in registry * @param gatewayId - gateway Identification */ - public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws TException { + public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws + TException { requestCount++; - logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------"); - logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId, processId); + log.info("-----------------------------------" + requestCount + "-----------------------------------------"); + log.info(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId, + processId); ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId); processContext.setAppCatalog(appCatalog); processContext.setExperimentCatalog(experimentCatalog); processContext.setCuratorClient(curatorClient); processContext.setLocalEventPublisher(localEventPublisher); - - GFacWorker worker = new GFacWorker(processContext); - InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId, - processId, gatewayId, tokenId); -// try { -// if( gfac.submitJob(experimentId, taskId, gatewayId)){ - logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " + - "{}", experimentId, processId, gatewayId); - - GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker); - - // we immediately return when we have a threadpool + try { + executorService.execute(new GFacWorker(processContext)); + } catch (AiravataException e) { + log.error("Failed to submit process", e); + return false; + } return true; } public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { - logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); + log.info(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); try { if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) { - logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId); + log.debug(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId); return true; } else { - logger.errorId(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId); + log.error(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId); return false; } } catch (Exception e) { - logger.errorId(experimentId, "Error cancelling the experiment {}.", experimentId); + log.error(experimentId, "Error cancelling the experiment {}.", experimentId); throw new TException("Error cancelling the experiment : " + e.getMessage(), e); } } @@ -247,11 +230,11 @@ public class GfacServerHandler implements GfacService.Iface { AbstractActivityListener abstractActivityListener = aClass.newInstance(); activityListeners.add(abstractActivityListener); abstractActivityListener.setup(publisher, experimentCatalog, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer); - logger.info("Registering listener: " + listenerClass); + log.info("Registering listener: " + listenerClass); publisher.registerListener(abstractActivityListener); } } catch (Exception e) { - logger.error("Error loading the listener classes configured in airavata-server.properties", e); + log.error("Error loading the listener classes configured in airavata-server.properties", e); } } private static class TestHandler implements MessageHandler{ @@ -276,7 +259,7 @@ public class GfacServerHandler implements GfacService.Iface { ThriftUtils.createThriftFromBytes(bytes, event); System.out.println(event.getExperimentId()); } catch (TException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } } @@ -320,13 +303,13 @@ public class GfacServerHandler implements GfacService.Iface { AiravataZKUtils.getExpStatePath(event.getExperimentId()); submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } } catch (TException e) { - logger.error(e.getMessage(), e); //nobody is listening so nothing to throw + log.error(e.getMessage(), e); //nobody is listening so nothing to throw } catch (RegistryException e) { - logger.error("Error while updating experiment status", e); + log.error("Error while updating experiment status", e); } } else if (message.getType().equals(MessageType.TERMINATETASK)) { boolean cancelSuccess = false; @@ -345,7 +328,7 @@ public class GfacServerHandler implements GfacService.Iface { "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled."); } } catch (Exception e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); }finally { if (cancelSuccess) { // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message. @@ -358,7 +341,7 @@ public class GfacServerHandler implements GfacService.Iface { rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } } catch (Exception e) { - logger.error("Error while ack to cancel request, experimentId: " + event.getExperimentId()); + log.error("Error while ack to cancel request, experimentId: " + event.getExperimentId()); } } }
