[ 
https://issues.apache.org/jira/browse/AIRFLOW-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655961#comment-16655961
 ] 

Pandu commented on AIRFLOW-3215:
--------------------------------

AWSTemplateFormatVersion: '2010-09-09'

Description: Airflow server backed by Postgres RDS

Parameters:
  KeyName:
    Description: Name of an existing EC2 KeyPair to enable SSH access into the 
Airflow web server
    Type: AWS::EC2::KeyPair::KeyName
    ConstraintDescription: Must be the name of an existing EC2 KeyPair
  S3BucketName:
    Description: REQUIRED - A new S3 Bucket name. This bucket will be used to 
read and write the Movielens dataset.
    Type: String
    AllowedPattern: '.+'
  DBPassword:
    Default: airflowpassword
    NoEcho: 'true'
    Description: Airflow database admin account password
    Type: String
    MinLength: '8'
    MaxLength: '41'
    AllowedPattern: '[a-zA-Z0-9]*'
    ConstraintDescription: Must contain only alphanumeric characters

# Mapping to find the Amazon Linux AMI in each region.
Mappings:
  RegionMap:
    us-east-1:
      AMI: ami-97785bed
Resources:
  EC2Instance:
    Type: AWS::EC2::Instance
    Properties:
      KeyName: !Ref 'KeyName'
      SecurityGroups: [!Ref 'AirflowEC2SecurityGroup']
      InstanceType: 'm4.xlarge'
      IamInstanceProfile:
        Ref: EC2InstanceProfile
      Tags:
        -
          Key: Name
          Value: Airflow
      ImageId: !FindInMap
        - RegionMap
        - !Ref 'AWS::Region'
        - AMI
      UserData:
        Fn::Base64: !Sub |
         #!/bin/bash
         set -x
         exec > >(tee /var/log/user-data.log|logger -t user-data ) 2>&1
         # Get the latest CloudFormation package
         echo "Installing aws-cfn"
         yum install -y aws-cfn-bootstrap
         # Start cfn-init
         /opt/aws/bin/cfn-init -v -c install --stack ${AWS::StackId} --resource 
EC2Instance --region ${AWS::Region}
         # Download and unzip the Movielens dataset
         wget http://files.grouplens.org/datasets/movielens/ml-latest.zip && 
unzip ml-latest.zip
         # Upload the movielens dataset files to the S3 bucket
         aws s3 cp ml-latest s3://${S3BucketName} --recursive
         # Install git
         sudo yum install -y git
         # Clone the git repository
         git clone 
https://github.com/aws-samples/aws-concurrent-data-orchestration-pipeline-emr-livy.git
         sudo pip install boto3
         # Install airflow using pip
         echo "Install Apache Airflow"
         sudo pip install apache-airflow
         # Encrypt connection passwords in metadata db
         sudo pip install apache-airflow[crypto]
         # Postgres operators and hook, support as an Airflow backend
         sudo pip install apache-airflow[postgres]
         sudo -H pip install six==1.10.0
         sudo pip install --upgrade six
         sudo pip install markupsafe
         sudo pip install --upgrade MarkupSafe
         echo 'export PATH=/usr/local/bin:$PATH' >> /root/.bash_profile
         source /root/.bash_profile
         # Initialize Airflow
         airflow initdb
         # Update the RDS connection in the Airflow Config file
         sed -i '/sql_alchemy_conn/s/^/#/g' ~/airflow/airflow.cfg
         sed -i '/sql_alchemy_conn/ a sql_alchemy_conn = 
postgresql://airflow:${DBPassword}@${DBInstance.Endpoint.Address}:${DBInstance.Endpoint.Port}/airflowdb'
 ~/airflow/airflow.cfg
         # Update the type of executor in the Airflow Config file
         sed -i '/executor = SequentialExecutor/s/^/#/g' ~/airflow/airflow.cfg
         sed -i '/executor = SequentialExecutor/ a executor = LocalExecutor' 
~/airflow/airflow.cfg
         airflow initdb
         # Move all the files to the ~/airflow directory. The Airflow config 
file is setup to hold all the DAG related files in the ~/airflow/ folder.
         mv aws-concurrent-data-orchestration-pipeline-emr-livy/* ~/airflow/
         # Delete the higher-level git repository directory
         rm -rf aws-concurrent-data-orchestration-pipeline-emr-livy
         # Replace the name of the S3 bucket in each of the .scala files. 
CHANGE THE HIGHLIGHTED PORTION BELOW TO THE NAME OF THE S3 BUCKET YOU CREATED 
IN STEP 1. The below command replaces the instance of the string ‘<s3-bucket>’ 
in each of the scripts to the name of the actual bucket.
         sed -i 's/<s3-bucket>/${S3BucketName}/g' /root/airflow/dags/transform/*
         # Run Airflow webserver
         airflow webserver
    Metadata:
      AWS::CloudFormation::Init:
        configSets:
          install:
            - gcc
        gcc:
          packages:
            yum:
              gcc: []
    DependsOn:
      - DBInstance
      - AirflowEC2SecurityGroup
  DBInstance:
    Type: AWS::RDS::DBInstance
    DeletionPolicy: Delete
    Properties:
      DBName: airflowdb
      Engine: postgres
      MasterUsername: airflow
      MasterUserPassword: !Ref 'DBPassword'
      DBInstanceClass: db.t2.small
      AllocatedStorage: 5
      DBSecurityGroups:
        - Ref: DBSecurityGroup
  AirflowEC2SecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: AirflowEC2SG
      GroupDescription: Enable HTTP access via port 80 + SSH access
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 80
          ToPort: 80
          CidrIp: 0.0.0.0/0
        - IpProtocol: tcp
          FromPort: 8080
          ToPort: 8080
          CidrIp: 0.0.0.0/0
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: 0.0.0.0/0
  AirflowEMRMasterEC2SecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: AirflowEMRMasterSG
      GroupDescription: Airflow EMR Master SG
    DependsOn:
      - AirflowEC2SecurityGroup
  AirflowEMRMasterInboundRule:
    Type: AWS::EC2::SecurityGroupIngress
    Properties:
      IpProtocol: tcp
      FromPort: '8998'
      ToPort: '8998'
      SourceSecurityGroupName: !Ref 'AirflowEC2SecurityGroup'
      GroupName: !Ref 'AirflowEMRMasterEC2SecurityGroup'
  AirflowEMRSlaveEC2SecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: AirflowEMRSlaveSG
      GroupDescription: Airflow EMR Slave SG
  DBSecurityGroup:
    Type: AWS::RDS::DBSecurityGroup
    Properties:
      GroupDescription: Frontend Access
      DBSecurityGroupIngress:
        EC2SecurityGroupName:
          Ref: AirflowEC2SecurityGroup
  EC2Role:
    Type: AWS::IAM::Role
    Properties:
      RoleName: AirflowInstanceRole
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "ec2.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess
  EC2InstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      InstanceProfileName: AirflowInstanceProfile
      Roles:
        -
          Ref: EC2Role
  EmrRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: EmrRole
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "elasticmapreduce.amazonaws.com"
                - "s3.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess
  EmrEc2Role:
    Type: AWS::IAM::Role
    Properties:
      RoleName: EmrEc2Role
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "ec2.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
  EmrEc2InstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      InstanceProfileName: EmrEc2InstanceProfile
      Roles:
        -
          Ref: EmrEc2Role
  S3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    Properties:
      AccessControl: BucketOwnerFullControl
      BucketName: !Ref 'S3BucketName'
Outputs:
  AirflowEC2PublicDNSName:
    Description: Public DNS Name of the Airflow EC2 instance
    Value: !Join ["", ["http://";, !GetAtt EC2Instance.PublicDnsName, ":8080"]]

> Creating EMR using python from airflow
> --------------------------------------
>
>                 Key: AIRFLOW-3215
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3215
>             Project: Apache Airflow
>          Issue Type: Task
>          Components: aws, DAG
>    Affects Versions: 1.7.0
>         Environment: Airflow with boto3 - connecting AWS -configure with 
> access and security 
>            Reporter: Pandu
>            Priority: Major
>
> I have problem with imports while creating EMR. 
> import boto3
> connection = boto3.client(
>     'emr'
> )
> cluster_id = connection.run_job_flow(
>           Name='emr123',
>           LogUri='s3://emr-spark-application/log.txt',
>           ReleaseLabel='emr-4.1.0',
>           Instances={
>             'InstanceGroups': [
>                 {
>                   'Name': "Master nodes",
>                   'Market': 'ON_DEMAND',
>                   'InstanceRole': 'MASTER',
>                   'InstanceType': 'm1.large',
>                   'InstanceCount': 1
>                 },
>                 {
>                   'Name': "Slave nodes",
>                   'Market': 'ON_DEMAND',
>                   'InstanceRole': 'CORE',
>                   'InstanceType': 'm1.large',
>                   'InstanceCount': 1
>                 }
>             ],
>             'KeepJobFlowAliveWhenNoSteps': True,
>             'TerminationProtected': False
>           },
>           Applications=[{
>              'Name': 'Hadoop'
>             }, {
>              'Name': 'Spark'
>           }],
>           JobFlowRole='EMR_EC2_DefaultRole',
>           ServiceRole='EMR_DefaultRole'
>         )
> print (cluster_id['JobFlowId'])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to