[
https://issues.apache.org/jira/browse/AIRFLOW-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653470#comment-16653470
]
Pandu commented on AIRFLOW-3215:
--------------------------------
Here is the code i have used to create EMR by data pipeline :( can you help me
please
import logging
import datetime
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import BashOperator, ExternalTaskSensor
from telemetry_pipeline_utils import *
START = datetime.combine(datetime.today() - timedelta(days=2),
datetime.min.time()) + timedelta(hours=10)
DAG_NAME = 'emr_model_building'
# initialize the DAG
default_args = {
'pool': 'emr_model_building',
'depends_on_past':False,
'start_date': START,
'retries': 1,
'retry_delay': timedelta(seconds=120),
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *')
# define the bash commands used in the tasks
launch_emr = """
{% if params.ENV == "PROD" %}
echo "Launching EMR cluster in Prod Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh launch,provision,deploy
model_building_prod.conf
{% else %}
echo "Launching EMR cluster in Stage Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh launch,provision,deploy
model_building_stage.conf
{% endif %}
"""
run_sm_and_reputation = """
{% if params.ENV == "PROD" %}
echo "Building sender models in Prod Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh sd
model_building_prod.conf
{% else %}
echo "Building sender models in Stage Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh sd
model_building_stage.conf
{% endif %}
"""
run_cdd = """
{% if params.ENV == "PROD" %}
echo "Building CDD in Prod Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh cdd
model_building_prod.conf
{% else %}
echo "Building CDD in Stage Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh cdd
model_building_stage.conf
{% endif %}
"""
terminate_cluster = """
{% if params.import_terminate_emr_cluster == true %}
{% if params.ENV == "PROD" %}
echo "Terminating EMR cluster in Prod Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh terminate
model_building_prod.conf
{% else %}
echo "Terminating EMR cluster in Stage Env"
source ~/.bash_profile; /root/airflow/dags/cluster.sh terminate
model_building_stage.conf
{% endif %}
{% else %}
echo "NOT terminating EMR cluster"
{% endif %}
"""
# define the individual tasks using Operators
t0 = ExternalTaskSensor(
task_id='wait_for_previous_run',
trigger_rule='one_success',
external_dag_id=DAG_NAME,
external_task_id='terminate_cluster',
allowed_states=['success'],
execution_delta=timedelta(days=1),
dag=dag)
t1 = BashOperator(
task_id='launch_emr',
bash_command=launch_emr,
execution_timeout=timedelta(hours=6),
pool='emr_model_building',
params={'ENV': ENV,
'import_terminate_emr_cluster':import_terminate_emr_cluster},
dag=dag)
t2 = BashOperator(
task_id='run_sm_and_reputation',
bash_command=run_sm_and_reputation,
execution_timeout=timedelta(hours=3),
pool='emr_model_building',
params={'ENV': ENV},
dag=dag)
t3 = BashOperator(
task_id='run_cdd',
bash_command=run_cdd,
execution_timeout=timedelta(hours=3),
pool='emr_model_building',
params={'ENV': ENV},
dag=dag)
t4 = BashOperator(
task_id='terminate_cluster',
bash_command=terminate_cluster,
execution_timeout=timedelta(hours=1),
params={'ENV': ENV,
'import_terminate_emr_cluster':import_terminate_emr_cluster},
pool='emr_model_building',
dag=dag)
# construct the DAG
t1.set_upstream(t0)
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
> Creating EMR using python from airflow
> --------------------------------------
>
> Key: AIRFLOW-3215
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3215
> Project: Apache Airflow
> Issue Type: Task
> Components: aws, DAG
> Affects Versions: 1.7.0
> Environment: Airflow with boto3 - connecting AWS -configure with
> access and security
> Reporter: Pandu
> Priority: Major
>
> I have problem with imports while creating EMR.
> import boto3
> connection = boto3.client(
> 'emr'
> )
> cluster_id = connection.run_job_flow(
> Name='emr123',
> LogUri='s3://emr-spark-application/log.txt',
> ReleaseLabel='emr-4.1.0',
> Instances={
> 'InstanceGroups': [
> {
> 'Name': "Master nodes",
> 'Market': 'ON_DEMAND',
> 'InstanceRole': 'MASTER',
> 'InstanceType': 'm1.large',
> 'InstanceCount': 1
> },
> {
> 'Name': "Slave nodes",
> 'Market': 'ON_DEMAND',
> 'InstanceRole': 'CORE',
> 'InstanceType': 'm1.large',
> 'InstanceCount': 1
> }
> ],
> 'KeepJobFlowAliveWhenNoSteps': True,
> 'TerminationProtected': False
> },
> Applications=[{
> 'Name': 'Hadoop'
> }, {
> 'Name': 'Spark'
> }],
> JobFlowRole='EMR_EC2_DefaultRole',
> ServiceRole='EMR_DefaultRole'
> )
> print (cluster_id['JobFlowId'])
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)