Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152257452
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or
DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or
DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
+ def workflow_ctx(self):
+ if self._workflow_context is None:
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=self._execution.service.id,
+ execution_id=self._execution.id,
+ workflow_name=self._execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+ return self._workflow_context
+
+ def compile(self, execution_inputs=None, executor=None,
execution_id=None):
+ assert not (execution_inputs and executor and execution_id)
+
+ if execution_id is None:
+ # If the execution is new
+ self._execution =
self._create_execution_model(execution_inputs)
+ self._model.execution.put(self._execution)
+ self._create_tasks(executor)
+ self._model.execution.update(self._execution)
+ else:
+ # If resuming an execution
+ self._execution = self._model.execution.get(execution_id)
+
+ return self.workflow_ctx
+
+ def _create_tasks(self, executor=None):
+
+ # Set default executor and kwargs
+ executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+ # transforming the execution inputs to dict, to pass them to the
workflow function
+ execution_inputs_dict = dict(inp.unwrapped for inp in
self._execution.inputs.itervalues())
+
+ if len(self._execution.tasks) == 0:
+ workflow_fn =
self._get_workflow_fn(self._execution.workflow_name)
+ tasks_graph = workflow_fn(ctx=self.workflow_ctx,
**execution_inputs_dict)
--- End diff --
api_tasks_graph
---