[ 
https://issues.apache.org/jira/browse/SQOOP-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Veena Basavaraj updated SQOOP-1510:
-----------------------------------
    Description: 
Adding more details to the ticket to explain how the Job/ Submission are 
structured in the code.

In sqoop we will create a job  giving it the FROM and the TO link ids

Something like this ...
{code}
create job -fromLink 1 -toLink 2
{code}

A job in SQOOP is a representation of all the data required for the JOB we will 
submit to the execution engine. hence the job in sqoop will hold the FROM link 
and FROM connector  and TO link and  its corresponding TO connector details. It 
will hold all its corresponding config values to invoke the FROM connector code 
and the TO connector code as part of the job lifecycle ( init, partitions, 
load, extract, destroy) steps. 

So once the job is created, we can perform these 4 actions on it

1. disable / re-enable it 
2. submit the job to the excution engine
3. at any point while it is running, abort it ( we can also call this stop if 
we want to)

All of the below is handled by the 
{code}JobManager{code} is an internal class that receives the requests for 
doing operations on the job resource.

So what does submit really do?

1. create a job request for the execution engine. This is a UBER object that 
holds all the information I just spoke about earlier, like FROM connector and 
TO connector details. It also holds a reference to the submission object.  
Submission object holds the results of the submit action. A new submission 
record is persisted into the SQOOP repository every time we call a job submit. 
This is representated as {code} MSubmission {code} internally.  

{code}
    // Bootstrap job to execute in the configured execution engine
    prepareJob(jobRequest);

{code}

2. We then call the submissionEngine api to submit,  that will inturn  choose 
the configured execution engine ( such as MR or spark in future) and then 
submit to it. 
{code}
      boolean success = submissionEngine.submit(jobRequest);

{code}
For consistency we call the submission and execution apis as submit

3. Once we succeed we persist the record in the Sqoop repo.
{code}
      
RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
{code}

4. If execution engine failed, we call some clean up code, but still persist 
the submission record in repo to record the "submit" action invoked from the 
client/ rest api

{code}
 public MSubmission submit(long jobId, HttpEventContext ctx) {

    MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
    JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
    // Bootstrap job to execute in the configured execution engine
    prepareJob(jobRequest);
    // Make sure that this job id is not currently running and submit the job
    // only if it's not.
    synchronized (getClass()) {
      MSubmission lastSubmission = 
RepositoryManager.getInstance().getRepository()
          .findLastSubmissionForJob(jobId);
      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
        throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + 
jobId);
      }
      // TODO(jarcec): We might need to catch all exceptions here to ensure
      // that Destroyer will be executed in all cases.
      
      // NOTE: the following is a blocking call
      boolean success = submissionEngine.submit(jobRequest);
      if (!success) {
        cleanUpOnJobSubmissionFailure(jobRequest);
        mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
      }
      // persist submisison record to repository and success status
      
RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
    }
    return mJobSubmission;
  }

{code}

Next, What happens if we want to stop/abort the running job. 

1. The stop action is a nooperation if there is no running job currently. We 
have an api that tells the last job submitted and we use it to check if the a 
requested job is in running state.

{code}
 // remove the last running job submission
    MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);

    if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
      throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
          + " is not running hence cannot stop");
    }
   
{code}

2. If a job was running, we call the submission engine api to stop which then 
calls the corresponding execution engine apis "kill Job"

{code}     submissionEngine.stop(mSubmission.getExternalId()); {code} For 
consistency we call all the apis as stop /abort
{code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
      if(runningJob == null) {
        return;
      }

      runningJob.killJob(); {code}

3.  Finally we update the status of the submission in the repo by calling 
{code}updateSubmission{code}

So in summary the actions on JOB will be reflected on the job resource

The job will have enable/ disable/ submit / abort(stop) Also get on the status 
and the job details

Submission is a read only resource which represents the side affect of 
submitting a job to the sqoop execution engine

While we change the rest apis to support the above, we also have to make the 
corresponsing changes in the sqoop client.

Here is the details of the sqoop client.

{code}

{code}



Why is the action called SUBMIT/ instead of START?

SUBMIT is a common term in both MR and SPARK execution engine, Also as a side 
affect of SUBMIT we create a SUBMISSION record and store it in the repository.


There are tons of places in the code and in the java docs, we actually mean 
submit when we say start in the current code. So why not just call it submit.

  DRIVER_0008("Invalid combination of submission and execution engines"),

  DRIVER_0009("Job has been disabled. Cannot submit this job."),

  DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),

  DRIVER_0011("Connector does not support specified direction. Cannot submit 
this job."),


  was:
The job will have enable/ disable/ submit / abort(stop)

Also get on the status and the job details

Submission is a read only resource



If someone is wondering why START is changed to submit: there are tons of 
places in the code and in the java docs, we actually mean submit when we say 
START

  DRIVER_0008("Invalid combination of submission and execution engines"),

  DRIVER_0009("Job has been disabled. Cannot submit this job."),

  DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),

  DRIVER_0011("Connector does not support specified direction. Cannot submit 
this job."),



> Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get 
> operation only
> --------------------------------------------------------------------------------------------
>
>                 Key: SQOOP-1510
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1510
>             Project: Sqoop
>          Issue Type: Sub-task
>            Reporter: Veena Basavaraj
>            Assignee: Veena Basavaraj
>
> Adding more details to the ticket to explain how the Job/ Submission are 
> structured in the code.
> In sqoop we will create a job  giving it the FROM and the TO link ids
> Something like this ...
> {code}
> create job -fromLink 1 -toLink 2
> {code}
> A job in SQOOP is a representation of all the data required for the JOB we 
> will submit to the execution engine. hence the job in sqoop will hold the 
> FROM link and FROM connector  and TO link and  its corresponding TO connector 
> details. It will hold all its corresponding config values to invoke the FROM 
> connector code and the TO connector code as part of the job lifecycle ( init, 
> partitions, load, extract, destroy) steps. 
> So once the job is created, we can perform these 4 actions on it
> 1. disable / re-enable it 
> 2. submit the job to the excution engine
> 3. at any point while it is running, abort it ( we can also call this stop if 
> we want to)
> All of the below is handled by the 
> {code}JobManager{code} is an internal class that receives the requests for 
> doing operations on the job resource.
> So what does submit really do?
> 1. create a job request for the execution engine. This is a UBER object that 
> holds all the information I just spoke about earlier, like FROM connector and 
> TO connector details. It also holds a reference to the submission object.  
> Submission object holds the results of the submit action. A new submission 
> record is persisted into the SQOOP repository every time we call a job 
> submit. This is representated as {code} MSubmission {code} internally.  
> {code}
>     // Bootstrap job to execute in the configured execution engine
>     prepareJob(jobRequest);
> {code}
> 2. We then call the submissionEngine api to submit,  that will inturn  choose 
> the configured execution engine ( such as MR or spark in future) and then 
> submit to it. 
> {code}
>       boolean success = submissionEngine.submit(jobRequest);
> {code}
> For consistency we call the submission and execution apis as submit
> 3. Once we succeed we persist the record in the Sqoop repo.
> {code}
>       
> RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
> {code}
> 4. If execution engine failed, we call some clean up code, but still persist 
> the submission record in repo to record the "submit" action invoked from the 
> client/ rest api
> {code}
>  public MSubmission submit(long jobId, HttpEventContext ctx) {
>     MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
>     JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
>     // Bootstrap job to execute in the configured execution engine
>     prepareJob(jobRequest);
>     // Make sure that this job id is not currently running and submit the job
>     // only if it's not.
>     synchronized (getClass()) {
>       MSubmission lastSubmission = 
> RepositoryManager.getInstance().getRepository()
>           .findLastSubmissionForJob(jobId);
>       if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
>         throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + 
> jobId);
>       }
>       // TODO(jarcec): We might need to catch all exceptions here to ensure
>       // that Destroyer will be executed in all cases.
>       
>       // NOTE: the following is a blocking call
>       boolean success = submissionEngine.submit(jobRequest);
>       if (!success) {
>         cleanUpOnJobSubmissionFailure(jobRequest);
>         mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
>       }
>       // persist submisison record to repository and success status
>       
> RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
>     }
>     return mJobSubmission;
>   }
> {code}
> Next, What happens if we want to stop/abort the running job. 
> 1. The stop action is a nooperation if there is no running job currently. We 
> have an api that tells the last job submitted and we use it to check if the a 
> requested job is in running state.
> {code}
>  // remove the last running job submission
>     MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);
>     if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
>       throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
>           + " is not running hence cannot stop");
>     }
>    
> {code}
> 2. If a job was running, we call the submission engine api to stop which then 
> calls the corresponding execution engine apis "kill Job"
> {code}     submissionEngine.stop(mSubmission.getExternalId()); {code} For 
> consistency we call all the apis as stop /abort
> {code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
>       if(runningJob == null) {
>         return;
>       }
>       runningJob.killJob(); {code}
> 3.  Finally we update the status of the submission in the repo by calling 
> {code}updateSubmission{code}
> So in summary the actions on JOB will be reflected on the job resource
> The job will have enable/ disable/ submit / abort(stop) Also get on the 
> status and the job details
> Submission is a read only resource which represents the side affect of 
> submitting a job to the sqoop execution engine
> While we change the rest apis to support the above, we also have to make the 
> corresponsing changes in the sqoop client.
> Here is the details of the sqoop client.
> {code}
> {code}
> Why is the action called SUBMIT/ instead of START?
> SUBMIT is a common term in both MR and SPARK execution engine, Also as a side 
> affect of SUBMIT we create a SUBMISSION record and store it in the repository.
> There are tons of places in the code and in the java docs, we actually mean 
> submit when we say start in the current code. So why not just call it submit.
>   DRIVER_0008("Invalid combination of submission and execution engines"),
>   DRIVER_0009("Job has been disabled. Cannot submit this job."),
>   DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),
>   DRIVER_0011("Connector does not support specified direction. Cannot submit 
> this job."),



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to