[ 
https://issues.apache.org/jira/browse/AIRFLOW-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653470#comment-16653470
 ] 

Pandu commented on AIRFLOW-3215:
--------------------------------

Here is the code i have used to create EMR by data pipeline :( can you help me 
please
import logging
import datetime
  
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import BashOperator, ExternalTaskSensor
from telemetry_pipeline_utils import *
  
START = datetime.combine(datetime.today() - timedelta(days=2), 
datetime.min.time()) + timedelta(hours=10)
DAG_NAME = 'emr_model_building'
  
# initialize the DAG
default_args = {
 'pool': 'emr_model_building',
 'depends_on_past':False,
 'start_date': START,
 'retries': 1,
 'retry_delay': timedelta(seconds=120),
 'email_on_failure': True,
 'email_on_retry': True
}
 
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *')
 
# define the bash commands used in the tasks
launch_emr = """
 {% if params.ENV == "PROD" %}
 echo "Launching EMR cluster in Prod Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh launch,provision,deploy 
model_building_prod.conf
 {% else %}
 echo "Launching EMR cluster in Stage Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh launch,provision,deploy 
model_building_stage.conf
 {% endif %}
 """
 
run_sm_and_reputation = """
 {% if params.ENV == "PROD" %}
 echo "Building sender models in Prod Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh sd 
model_building_prod.conf
 {% else %}
 echo "Building sender models in Stage Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh sd 
model_building_stage.conf
 {% endif %}
 """
 
run_cdd = """
 {% if params.ENV == "PROD" %}
 echo "Building CDD in Prod Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh cdd 
model_building_prod.conf
 {% else %}
 echo "Building CDD in Stage Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh cdd 
model_building_stage.conf
 {% endif %}
 """
 
terminate_cluster = """
 {% if params.import_terminate_emr_cluster == true %}
 {% if params.ENV == "PROD" %}
 echo "Terminating EMR cluster in Prod Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh terminate 
model_building_prod.conf
 {% else %}
 echo "Terminating EMR cluster in Stage Env"
 source ~/.bash_profile; /root/airflow/dags/cluster.sh terminate 
model_building_stage.conf
 {% endif %}
 {% else %}
 echo "NOT terminating EMR cluster"
 {% endif %}
 """
 
# define the individual tasks using Operators
t0 = ExternalTaskSensor(
 task_id='wait_for_previous_run',
 trigger_rule='one_success',
 external_dag_id=DAG_NAME,
 external_task_id='terminate_cluster',
 allowed_states=['success'],
 execution_delta=timedelta(days=1),
 dag=dag)
 
t1 = BashOperator(
 task_id='launch_emr',
 bash_command=launch_emr,
 execution_timeout=timedelta(hours=6),
 pool='emr_model_building',
 params={'ENV': ENV, 
'import_terminate_emr_cluster':import_terminate_emr_cluster},
 dag=dag)
 
t2 = BashOperator(
 task_id='run_sm_and_reputation',
 bash_command=run_sm_and_reputation,
 execution_timeout=timedelta(hours=3),
 pool='emr_model_building',
 params={'ENV': ENV},
 dag=dag)
 
t3 = BashOperator(
 task_id='run_cdd',
 bash_command=run_cdd,
 execution_timeout=timedelta(hours=3),
 pool='emr_model_building',
 params={'ENV': ENV},
 dag=dag)
 
t4 = BashOperator(
 task_id='terminate_cluster',
 bash_command=terminate_cluster,
 execution_timeout=timedelta(hours=1),
 params={'ENV': ENV, 
'import_terminate_emr_cluster':import_terminate_emr_cluster},
 pool='emr_model_building',
 dag=dag)
 
# construct the DAG
t1.set_upstream(t0)
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)


> Creating EMR using python from airflow
> --------------------------------------
>
>                 Key: AIRFLOW-3215
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3215
>             Project: Apache Airflow
>          Issue Type: Task
>          Components: aws, DAG
>    Affects Versions: 1.7.0
>         Environment: Airflow with boto3 - connecting AWS -configure with 
> access and security 
>            Reporter: Pandu
>            Priority: Major
>
> I have problem with imports while creating EMR. 
> import boto3
> connection = boto3.client(
>     'emr'
> )
> cluster_id = connection.run_job_flow(
>           Name='emr123',
>           LogUri='s3://emr-spark-application/log.txt',
>           ReleaseLabel='emr-4.1.0',
>           Instances={
>             'InstanceGroups': [
>                 {
>                   'Name': "Master nodes",
>                   'Market': 'ON_DEMAND',
>                   'InstanceRole': 'MASTER',
>                   'InstanceType': 'm1.large',
>                   'InstanceCount': 1
>                 },
>                 {
>                   'Name': "Slave nodes",
>                   'Market': 'ON_DEMAND',
>                   'InstanceRole': 'CORE',
>                   'InstanceType': 'm1.large',
>                   'InstanceCount': 1
>                 }
>             ],
>             'KeepJobFlowAliveWhenNoSteps': True,
>             'TerminationProtected': False
>           },
>           Applications=[{
>              'Name': 'Hadoop'
>             }, {
>              'Name': 'Spark'
>           }],
>           JobFlowRole='EMR_EC2_DefaultRole',
>           ServiceRole='EMR_DefaultRole'
>         )
> print (cluster_id['JobFlowId'])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to