pdavis156879 opened a new issue #15357:
URL: https://github.com/apache/airflow/issues/15357
Airflow Version: 1.10.2
Python Version: 3.6.3
OS Version: 1.10.2
I'm attempting to start an EMR cluster with a pre-defined step meaning, I'm
combining EmrCreateJobFlowOperator, EmrAddStepsOperator and EmrStepSensor
(originally I had the steps directly in the EmrCreateJobFlowOperator).
However, regardless of what I do, I always get the same error in the EMR
step execution:
```
Details :Caused by: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.S3AFileSystem not found
JAR location :command-runner.jar
```
I tried with my full python Spark code, with a simpler version with only a
main with a print and import, and yet, the error persists.
The EMR DAG flow is:
```
SPARK_STEPS = [
{
'Name': 'test emr',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
's3a://' + s3_path + '/scripts/etl.py',
'--execution_date={{ds}}',
'--aws_key=' + aws_username,
'--aws_secret=' + aws_password
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'Manifold ETL EMR',
'LogUri': 's3://mybucket/logs/',
'ReleaseLabel': 'emr-6.2.0',
# --jars /usr/lib/hadoop/hadoop-aws.jar
'Applications': [
{
'Name': 'Spark'
},
{
'Name': 'Hadoop'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm4.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_path }}/scripts/bootstrap.sh',
}
},
],
}
create_emr: EmrCreateJobFlowOperator = EmrCreateJobFlowOperator(
task_id='start_emr',
dag=dag,
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_credentials',
emr_conn_id='emr_credentials',
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
dag=dag,
job_flow_id="{{ task_instance.xcom_pull(task_ids='start_emr',
key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
dag=dag,
job_flow_id="{{ task_instance.xcom_pull('start_emr',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
key='return_value')[0] }}",
aws_conn_id='aws_credentials',
)
```
and the basic python file:
```
import boto3
import pyspark
from pyspark.sql.functions import concat_ws, sha2, regexp_replace, lit, col,
when, length, substring
if __name__ == '__main__':
print('Test complete')
```
I don't really see anything which can trigger the specified error, in the
test python file, I'm not even using an S3 handler, which begs the question: is
this an Airflow EMR Operator problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]