yuqi1129 commented on code in PR #7772:
URL: https://github.com/apache/gravitino/pull/7772#discussion_r2218448720


##########
core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector.job;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+
+/**
+ * The JobExecutor interface defines the API for executing jobs in a specific 
Job runner, for
+ * example, Airflow, local runner, etc. The developer can implement this 
interface to adapt to the
+ * specific job runner. The Gravitino core will use this interface to submit 
jobs, get job status,
+ * etc.
+ */
+@DeveloperApi
+public interface JobExecutor extends Closeable {
+
+  /**
+   * Initialize the job executor with the given configurations.
+   *
+   * @param configs A map of configuration key-value pairs.
+   */
+  void initialize(Map<String, String> configs);
+
+  /**
+   * Submit a job with the given name and job template to the external job 
runner.
+   *
+   * <p>The returned job identifier is unique in the external job runner, and 
can be used to track
+   * the job status or cancel it later.
+   *
+   * <p>The placeholders in the job template has already been replaced with 
the actual values before
+   * calling this method. So the implementors can directly use this job 
template to submit the job
+   * to the external job runner.
+   *
+   * @param jobTemplate The job template containing the job configuration and 
parameters.
+   * @return A unique identifier for the submitted job.
+   */
+  String submitJob(JobTemplate jobTemplate);
+
+  /**
+   * Get the status of a job by its unique identifier. The status should be 
one of the values in
+   * {@link JobHandle.Status}. The implementors should query the external job 
runner to get the job
+   * status, and map the status to the values in {@link JobHandle.Status}.
+   *
+   * @param jobId The unique identifier of the job.
+   * @return The status of the job.
+   * @throws NoSuchJobException If the job with the given identifier does not 
exist.
+   */
+  JobHandle.Status getJobStatus(String jobId) throws NoSuchJobException;
+
+  /**
+   * Cancel a job by its unique identifier. The job runner should stop the job 
if it is currently
+   * running. If the job is already completed, it should return directory 
without any error. If the

Review Comment:
   directory -> directly.



##########
core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector.job;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+
+/**
+ * The JobExecutor interface defines the API for executing jobs in a specific 
Job runner, for
+ * example, Airflow, local runner, etc. The developer can implement this 
interface to adapt to the
+ * specific job runner. The Gravitino core will use this interface to submit 
jobs, get job status,
+ * etc.
+ */
+@DeveloperApi
+public interface JobExecutor extends Closeable {
+
+  /**
+   * Initialize the job executor with the given configurations.
+   *
+   * @param configs A map of configuration key-value pairs.
+   */
+  void initialize(Map<String, String> configs);
+
+  /**
+   * Submit a job with the given name and job template to the external job 
runner.
+   *
+   * <p>The returned job identifier is unique in the external job runner, and 
can be used to track
+   * the job status or cancel it later.
+   *
+   * <p>The placeholders in the job template has already been replaced with 
the actual values before
+   * calling this method. So the implementors can directly use this job 
template to submit the job
+   * to the external job runner.
+   *
+   * @param jobTemplate The job template containing the job configuration and 
parameters.
+   * @return A unique identifier for the submitted job.
+   */
+  String submitJob(JobTemplate jobTemplate);
+
+  /**
+   * Get the status of a job by its unique identifier. The status should be 
one of the values in
+   * {@link JobHandle.Status}. The implementors should query the external job 
runner to get the job
+   * status, and map the status to the values in {@link JobHandle.Status}.
+   *
+   * @param jobId The unique identifier of the job.
+   * @return The status of the job.
+   * @throws NoSuchJobException If the job with the given identifier does not 
exist.
+   */
+  JobHandle.Status getJobStatus(String jobId) throws NoSuchJobException;
+
+  /**
+   * Cancel a job by its unique identifier. The job runner should stop the job 
if it is currently
+   * running. If the job is already completed, it should return directory 
without any error. If the
+   * job does not exist, it should throw a {@link NoSuchJobException}.
+   *
+   * <p>This method should not block on the job cancellation, it should return 
immediately, and

Review Comment:
   `This method should not block on the job cancellation` is quite confusing. 
Should it be `This method should not be blocked...`



##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -226,8 +267,243 @@ public JobEntity getJob(String metalake, String jobId) 
throws NoSuchJobException
         });
   }
 
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Check if the job template exists, will throw NoSuchJobTemplateException 
if it does not exist.
+    JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
+
+    // Create staging directory.
+    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
+    long jobId = idGenerator.nextId();
+    File jobStagingDir = new File(stagingDir, JobHandle.JOB_ID_PREFIX + jobId);
+    if (!jobStagingDir.mkdirs()) {
+      throw new RuntimeException(
+          String.format("Failed to create staging directory %s for job %s", 
jobStagingDir, jobId));
+    }
+
+    // Create a JobTemplate by replacing the template parameters with the 
jobConf values, and
+    // also downloading any necessary files from the URIs specified in the job 
template.
+    JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity, 
jobConf, jobStagingDir);
+
+    // Submit the job template to the job executor
+    String jobExecutionId;
+    try {
+      jobExecutionId = jobExecutor.submitJob(jobTemplate);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to submit job template %s for execution", 
jobTemplate), e);
+    }
+
+    // Create a new JobEntity to represent the job
+    JobEntity jobEntity =
+        JobEntity.builder()
+            .withId(jobId)
+            .withJobExecutionId(jobExecutionId)
+            .withJobTemplateName(jobTemplateName)
+            .withStatus(JobHandle.Status.QUEUED)
+            .withNamespace(NamespaceUtil.ofJob(metalake))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      entityStore.put(jobEntity, false /* overwrite */);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to register the job entity " + 
jobEntity, e);

Review Comment:
   Do we need to cancel the job if there are any exceptions while storing to 
the entity store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to