[
https://issues.apache.org/jira/browse/AIRFLOW-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655961#comment-16655961
]
Pandu commented on AIRFLOW-3215:
--------------------------------
AWSTemplateFormatVersion: '2010-09-09'
Description: Airflow server backed by Postgres RDS
Parameters:
KeyName:
Description: Name of an existing EC2 KeyPair to enable SSH access into the
Airflow web server
Type: AWS::EC2::KeyPair::KeyName
ConstraintDescription: Must be the name of an existing EC2 KeyPair
S3BucketName:
Description: REQUIRED - A new S3 Bucket name. This bucket will be used to
read and write the Movielens dataset.
Type: String
AllowedPattern: '.+'
DBPassword:
Default: airflowpassword
NoEcho: 'true'
Description: Airflow database admin account password
Type: String
MinLength: '8'
MaxLength: '41'
AllowedPattern: '[a-zA-Z0-9]*'
ConstraintDescription: Must contain only alphanumeric characters
# Mapping to find the Amazon Linux AMI in each region.
Mappings:
RegionMap:
us-east-1:
AMI: ami-97785bed
Resources:
EC2Instance:
Type: AWS::EC2::Instance
Properties:
KeyName: !Ref 'KeyName'
SecurityGroups: [!Ref 'AirflowEC2SecurityGroup']
InstanceType: 'm4.xlarge'
IamInstanceProfile:
Ref: EC2InstanceProfile
Tags:
-
Key: Name
Value: Airflow
ImageId: !FindInMap
- RegionMap
- !Ref 'AWS::Region'
- AMI
UserData:
Fn::Base64: !Sub |
#!/bin/bash
set -x
exec > >(tee /var/log/user-data.log|logger -t user-data ) 2>&1
# Get the latest CloudFormation package
echo "Installing aws-cfn"
yum install -y aws-cfn-bootstrap
# Start cfn-init
/opt/aws/bin/cfn-init -v -c install --stack ${AWS::StackId} --resource
EC2Instance --region ${AWS::Region}
# Download and unzip the Movielens dataset
wget http://files.grouplens.org/datasets/movielens/ml-latest.zip &&
unzip ml-latest.zip
# Upload the movielens dataset files to the S3 bucket
aws s3 cp ml-latest s3://${S3BucketName} --recursive
# Install git
sudo yum install -y git
# Clone the git repository
git clone
https://github.com/aws-samples/aws-concurrent-data-orchestration-pipeline-emr-livy.git
sudo pip install boto3
# Install airflow using pip
echo "Install Apache Airflow"
sudo pip install apache-airflow
# Encrypt connection passwords in metadata db
sudo pip install apache-airflow[crypto]
# Postgres operators and hook, support as an Airflow backend
sudo pip install apache-airflow[postgres]
sudo -H pip install six==1.10.0
sudo pip install --upgrade six
sudo pip install markupsafe
sudo pip install --upgrade MarkupSafe
echo 'export PATH=/usr/local/bin:$PATH' >> /root/.bash_profile
source /root/.bash_profile
# Initialize Airflow
airflow initdb
# Update the RDS connection in the Airflow Config file
sed -i '/sql_alchemy_conn/s/^/#/g' ~/airflow/airflow.cfg
sed -i '/sql_alchemy_conn/ a sql_alchemy_conn =
postgresql://airflow:${DBPassword}@${DBInstance.Endpoint.Address}:${DBInstance.Endpoint.Port}/airflowdb'
~/airflow/airflow.cfg
# Update the type of executor in the Airflow Config file
sed -i '/executor = SequentialExecutor/s/^/#/g' ~/airflow/airflow.cfg
sed -i '/executor = SequentialExecutor/ a executor = LocalExecutor'
~/airflow/airflow.cfg
airflow initdb
# Move all the files to the ~/airflow directory. The Airflow config
file is setup to hold all the DAG related files in the ~/airflow/ folder.
mv aws-concurrent-data-orchestration-pipeline-emr-livy/* ~/airflow/
# Delete the higher-level git repository directory
rm -rf aws-concurrent-data-orchestration-pipeline-emr-livy
# Replace the name of the S3 bucket in each of the .scala files.
CHANGE THE HIGHLIGHTED PORTION BELOW TO THE NAME OF THE S3 BUCKET YOU CREATED
IN STEP 1. The below command replaces the instance of the string ‘<s3-bucket>’
in each of the scripts to the name of the actual bucket.
sed -i 's/<s3-bucket>/${S3BucketName}/g' /root/airflow/dags/transform/*
# Run Airflow webserver
airflow webserver
Metadata:
AWS::CloudFormation::Init:
configSets:
install:
- gcc
gcc:
packages:
yum:
gcc: []
DependsOn:
- DBInstance
- AirflowEC2SecurityGroup
DBInstance:
Type: AWS::RDS::DBInstance
DeletionPolicy: Delete
Properties:
DBName: airflowdb
Engine: postgres
MasterUsername: airflow
MasterUserPassword: !Ref 'DBPassword'
DBInstanceClass: db.t2.small
AllocatedStorage: 5
DBSecurityGroups:
- Ref: DBSecurityGroup
AirflowEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: AirflowEC2SG
GroupDescription: Enable HTTP access via port 80 + SSH access
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 80
ToPort: 80
CidrIp: 0.0.0.0/0
- IpProtocol: tcp
FromPort: 8080
ToPort: 8080
CidrIp: 0.0.0.0/0
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: 0.0.0.0/0
AirflowEMRMasterEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: AirflowEMRMasterSG
GroupDescription: Airflow EMR Master SG
DependsOn:
- AirflowEC2SecurityGroup
AirflowEMRMasterInboundRule:
Type: AWS::EC2::SecurityGroupIngress
Properties:
IpProtocol: tcp
FromPort: '8998'
ToPort: '8998'
SourceSecurityGroupName: !Ref 'AirflowEC2SecurityGroup'
GroupName: !Ref 'AirflowEMRMasterEC2SecurityGroup'
AirflowEMRSlaveEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: AirflowEMRSlaveSG
GroupDescription: Airflow EMR Slave SG
DBSecurityGroup:
Type: AWS::RDS::DBSecurityGroup
Properties:
GroupDescription: Frontend Access
DBSecurityGroupIngress:
EC2SecurityGroupName:
Ref: AirflowEC2SecurityGroup
EC2Role:
Type: AWS::IAM::Role
Properties:
RoleName: AirflowInstanceRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "ec2.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess
EC2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
InstanceProfileName: AirflowInstanceProfile
Roles:
-
Ref: EC2Role
EmrRole:
Type: AWS::IAM::Role
Properties:
RoleName: EmrRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "elasticmapreduce.amazonaws.com"
- "s3.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess
EmrEc2Role:
Type: AWS::IAM::Role
Properties:
RoleName: EmrEc2Role
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "ec2.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
- arn:aws:iam::aws:policy/AmazonS3FullAccess
EmrEc2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
InstanceProfileName: EmrEc2InstanceProfile
Roles:
-
Ref: EmrEc2Role
S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
AccessControl: BucketOwnerFullControl
BucketName: !Ref 'S3BucketName'
Outputs:
AirflowEC2PublicDNSName:
Description: Public DNS Name of the Airflow EC2 instance
Value: !Join ["", ["http://", !GetAtt EC2Instance.PublicDnsName, ":8080"]]
> Creating EMR using python from airflow
> --------------------------------------
>
> Key: AIRFLOW-3215
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3215
> Project: Apache Airflow
> Issue Type: Task
> Components: aws, DAG
> Affects Versions: 1.7.0
> Environment: Airflow with boto3 - connecting AWS -configure with
> access and security
> Reporter: Pandu
> Priority: Major
>
> I have problem with imports while creating EMR.
> import boto3
> connection = boto3.client(
> 'emr'
> )
> cluster_id = connection.run_job_flow(
> Name='emr123',
> LogUri='s3://emr-spark-application/log.txt',
> ReleaseLabel='emr-4.1.0',
> Instances={
> 'InstanceGroups': [
> {
> 'Name': "Master nodes",
> 'Market': 'ON_DEMAND',
> 'InstanceRole': 'MASTER',
> 'InstanceType': 'm1.large',
> 'InstanceCount': 1
> },
> {
> 'Name': "Slave nodes",
> 'Market': 'ON_DEMAND',
> 'InstanceRole': 'CORE',
> 'InstanceType': 'm1.large',
> 'InstanceCount': 1
> }
> ],
> 'KeepJobFlowAliveWhenNoSteps': True,
> 'TerminationProtected': False
> },
> Applications=[{
> 'Name': 'Hadoop'
> }, {
> 'Name': 'Spark'
> }],
> JobFlowRole='EMR_EC2_DefaultRole',
> ServiceRole='EMR_DefaultRole'
> )
> print (cluster_id['JobFlowId'])
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)