This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new e7210d396e [#8783] feat(core): Add the job related events support to
the job system (#8799)
e7210d396e is described below
commit e7210d396e26c0dacbacc564c4432766ef9f97c6
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Oct 14 15:13:18 2025 +0800
[#8783] feat(core): Add the job related events support to the job system
(#8799)
### What changes were proposed in this pull request?
This is the second PR to add the job-related events support to the job
system.
### Why are the changes needed?
Fix: #8783
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
added UTs
---
.../gravitino/listener/JobEventDispatcher.java | 79 ++++++-
.../listener/api/event/job/CancelJobEvent.java | 63 ++++++
.../api/event/job/CancelJobFailureEvent.java | 52 +++++
.../listener/api/event/job/CancelJobPreEvent.java | 50 +++++
.../listener/api/event/job/GetJobEvent.java | 63 ++++++
.../listener/api/event/job/GetJobFailureEvent.java | 52 +++++
.../listener/api/event/job/GetJobPreEvent.java | 50 +++++
.../gravitino/listener/api/event/job/JobEvent.java | 49 +++++
.../listener/api/event/job/JobFailureEvent.java | 44 ++++
.../listener/api/event/job/JobPreEvent.java | 39 ++++
.../listener/api/event/job/ListJobsEvent.java | 64 ++++++
.../api/event/job/ListJobsFailureEvent.java | 67 ++++++
.../listener/api/event/job/ListJobsPreEvent.java | 63 ++++++
.../listener/api/event/job/RunJobEvent.java | 94 ++++++++
.../listener/api/event/job/RunJobFailureEvent.java | 82 +++++++
.../listener/api/event/job/RunJobPreEvent.java | 77 +++++++
.../gravitino/listener/api/info/JobInfo.java | 94 ++++++++
.../listener/api/event/TestJobEventDispatcher.java | 245 +++++++++++++++++++--
docs/gravitino-server-config.md | 2 +
19 files changed, 1309 insertions(+), 20 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/listener/JobEventDispatcher.java
b/core/src/main/java/org/apache/gravitino/listener/JobEventDispatcher.java
index 30e3b97be6..f2d40e49b7 100644
--- a/core/src/main/java/org/apache/gravitino/listener/JobEventDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/listener/JobEventDispatcher.java
@@ -31,18 +31,31 @@ import org.apache.gravitino.job.JobTemplateChange;
import org.apache.gravitino.listener.api.event.job.AlterJobTemplateEvent;
import
org.apache.gravitino.listener.api.event.job.AlterJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.AlterJobTemplatePreEvent;
+import org.apache.gravitino.listener.api.event.job.CancelJobEvent;
+import org.apache.gravitino.listener.api.event.job.CancelJobFailureEvent;
+import org.apache.gravitino.listener.api.event.job.CancelJobPreEvent;
import org.apache.gravitino.listener.api.event.job.DeleteJobTemplateEvent;
import
org.apache.gravitino.listener.api.event.job.DeleteJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.DeleteJobTemplatePreEvent;
+import org.apache.gravitino.listener.api.event.job.GetJobEvent;
+import org.apache.gravitino.listener.api.event.job.GetJobFailureEvent;
+import org.apache.gravitino.listener.api.event.job.GetJobPreEvent;
import org.apache.gravitino.listener.api.event.job.GetJobTemplateEvent;
import org.apache.gravitino.listener.api.event.job.GetJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.GetJobTemplatePreEvent;
import org.apache.gravitino.listener.api.event.job.ListJobTemplatesEvent;
import
org.apache.gravitino.listener.api.event.job.ListJobTemplatesFailureEvent;
import org.apache.gravitino.listener.api.event.job.ListJobTemplatesPreEvent;
+import org.apache.gravitino.listener.api.event.job.ListJobsEvent;
+import org.apache.gravitino.listener.api.event.job.ListJobsFailureEvent;
+import org.apache.gravitino.listener.api.event.job.ListJobsPreEvent;
import org.apache.gravitino.listener.api.event.job.RegisterJobTemplateEvent;
import
org.apache.gravitino.listener.api.event.job.RegisterJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.RegisterJobTemplatePreEvent;
+import org.apache.gravitino.listener.api.event.job.RunJobEvent;
+import org.apache.gravitino.listener.api.event.job.RunJobFailureEvent;
+import org.apache.gravitino.listener.api.event.job.RunJobPreEvent;
+import org.apache.gravitino.listener.api.info.JobInfo;
import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.utils.PrincipalUtils;
@@ -164,23 +177,81 @@ public class JobEventDispatcher implements
JobOperationDispatcher {
@Override
public List<JobEntity> listJobs(String metalake, java.util.Optional<String>
jobTemplateName)
throws NoSuchJobTemplateException {
- return jobOperationDispatcher.listJobs(metalake, jobTemplateName);
+ eventBus.dispatchEvent(
+ new ListJobsPreEvent(PrincipalUtils.getCurrentUserName(), metalake,
jobTemplateName));
+
+ try {
+ List<JobEntity> jobs = jobOperationDispatcher.listJobs(metalake,
jobTemplateName);
+ eventBus.dispatchEvent(
+ new ListJobsEvent(PrincipalUtils.getCurrentUserName(), metalake,
jobTemplateName));
+ return jobs;
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new ListJobsFailureEvent(
+ PrincipalUtils.getCurrentUserName(), metalake, jobTemplateName,
e));
+ throw e;
+ }
}
@Override
public JobEntity getJob(String metalake, String jobId) throws
NoSuchJobException {
- return jobOperationDispatcher.getJob(metalake, jobId);
+ eventBus.dispatchEvent(
+ new GetJobPreEvent(PrincipalUtils.getCurrentUserName(), metalake,
jobId));
+
+ try {
+ JobEntity job = jobOperationDispatcher.getJob(metalake, jobId);
+ eventBus.dispatchEvent(
+ new GetJobEvent(
+ PrincipalUtils.getCurrentUserName(), metalake,
JobInfo.fromJobEntity(job)));
+ return job;
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new GetJobFailureEvent(PrincipalUtils.getCurrentUserName(),
metalake, jobId, e));
+ throw e;
+ }
}
@Override
public JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
throws NoSuchJobTemplateException {
- return jobOperationDispatcher.runJob(metalake, jobTemplateName, jobConf);
+ eventBus.dispatchEvent(
+ new RunJobPreEvent(
+ PrincipalUtils.getCurrentUserName(), metalake, jobTemplateName,
jobConf));
+
+ try {
+ JobEntity job = jobOperationDispatcher.runJob(metalake, jobTemplateName,
jobConf);
+ eventBus.dispatchEvent(
+ new RunJobEvent(
+ PrincipalUtils.getCurrentUserName(),
+ metalake,
+ jobTemplateName,
+ jobConf,
+ JobInfo.fromJobEntity(job)));
+ return job;
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new RunJobFailureEvent(
+ PrincipalUtils.getCurrentUserName(), metalake, jobTemplateName,
jobConf, e));
+ throw e;
+ }
}
@Override
public JobEntity cancelJob(String metalake, String jobId) throws
NoSuchJobException {
- return jobOperationDispatcher.cancelJob(metalake, jobId);
+ eventBus.dispatchEvent(
+ new CancelJobPreEvent(PrincipalUtils.getCurrentUserName(), metalake,
jobId));
+
+ try {
+ JobEntity job = jobOperationDispatcher.cancelJob(metalake, jobId);
+ eventBus.dispatchEvent(
+ new CancelJobEvent(
+ PrincipalUtils.getCurrentUserName(), metalake,
JobInfo.fromJobEntity(job)));
+ return job;
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new CancelJobFailureEvent(PrincipalUtils.getCurrentUserName(),
metalake, jobId, e));
+ throw e;
+ }
}
@Override
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobEvent.java
new file mode 100644
index 0000000000..267df34744
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.listener.api.info.JobInfo;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered when a job has been successfully cancelled.
*/
+@DeveloperApi
+public class CancelJobEvent extends JobEvent {
+
+ private final JobInfo jobInfo;
+
+ /**
+ * Constructs a new {@code CancelJobEvent} instance.
+ *
+ * @param user The user who initiated the job cancellation operation.
+ * @param metalake The metalake name where the job resides.
+ * @param jobInfo The information of the job that has been cancelled.
+ */
+ public CancelJobEvent(String user, String metalake, JobInfo jobInfo) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobInfo.jobId()));
+ this.jobInfo = jobInfo;
+ }
+
+ /**
+ * Returns the information of the job that has been cancelled.
+ *
+ * @return the job information.
+ */
+ public JobInfo cancelledJobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.CANCEL_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobFailureEvent.java
new file mode 100644
index 0000000000..568be27753
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobFailureEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered when cancelling a job has failed. */
+@DeveloperApi
+public class CancelJobFailureEvent extends JobFailureEvent {
+
+ /**
+ * Constructs a new {@code CancelJobFailureEvent} instance.
+ *
+ * @param user The user who initiated the job cancellation operation.
+ * @param metalake The metalake name where the job resides.
+ * @param jobId The ID of the job that failed to be cancelled.
+ * @param exception The exception encountered during the job cancellation
operation, providing
+ * insights into the reasons behind the failure.
+ */
+ public CancelJobFailureEvent(String user, String metalake, String jobId,
Exception exception) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobId), exception);
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.CANCEL_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobPreEvent.java
new file mode 100644
index 0000000000..bc5f76bc05
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/CancelJobPreEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered before cancelling a job. */
+@DeveloperApi
+public class CancelJobPreEvent extends JobPreEvent {
+
+ /**
+ * Constructs a new {@code CancelJobPreEvent} instance.
+ *
+ * @param user The user who initiated the job cancellation operation.
+ * @param metalake The metalake name where the job resides.
+ * @param jobId The ID of the job to be cancelled.
+ */
+ public CancelJobPreEvent(String user, String metalake, String jobId) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobId));
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.CANCEL_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobEvent.java
new file mode 100644
index 0000000000..26cbc4239c
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.listener.api.info.JobInfo;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered when a job has been successfully loaded. */
+@DeveloperApi
+public class GetJobEvent extends JobEvent {
+
+ private final JobInfo jobInfo;
+
+ /**
+ * Constructs a new {@code GetJobEvent} instance.
+ *
+ * @param user The user who initiated the job retrieval operation.
+ * @param metalake The metalake name where the job resides.
+ * @param jobInfo The information of the job that has been loaded.
+ */
+ public GetJobEvent(String user, String metalake, JobInfo jobInfo) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobInfo.jobId()));
+ this.jobInfo = jobInfo;
+ }
+
+ /**
+ * Returns the information of the job that has been loaded.
+ *
+ * @return the job information.
+ */
+ public JobInfo loadedJobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.GET_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobFailureEvent.java
new file mode 100644
index 0000000000..4570021524
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobFailureEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered when the retrieval of a job has failed. */
+@DeveloperApi
+public class GetJobFailureEvent extends JobFailureEvent {
+
+ /**
+ * Constructs a new {@code GetJobFailureEvent} instance.
+ *
+ * @param user The user who initiated the job retrieval operation.
+ * @param metalake The metalake name where the job resides.
+ * @param jobId The ID of the job that failed to be retrieved.
+ * @param exception The exception encountered during the job retrieval,
providing insights into
+ * the reasons behind the failure.
+ */
+ public GetJobFailureEvent(String user, String metalake, String jobId,
Exception exception) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobId), exception);
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.GET_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobPreEvent.java
new file mode 100644
index 0000000000..f62cca5c80
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/GetJobPreEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered before the retrieval of a job. */
+@DeveloperApi
+public class GetJobPreEvent extends JobPreEvent {
+
+ /**
+ * Constructs a new {@code GetJobPreEvent} instance.
+ *
+ * @param user The user who initiated the job retrieval operation.
+ * @param metalake The metalake name where the job resides.
+ * @param jobId The ID of the job to be retrieved.
+ */
+ public GetJobPreEvent(String user, String metalake, String jobId) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobId));
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.GET_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobEvent.java
new file mode 100644
index 0000000000..600eb86ba7
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.OperationStatus;
+
+/**
+ * Represents an abstract base class for events related to job operations.
This class extends {@link
+ * Event} to provide a more specific context involving operations on jobs,
such as get, run, or
+ * cancel.
+ */
+@DeveloperApi
+public abstract class JobEvent extends Event {
+
+ /**
+ * Constructs a new {@code JobEvent} with the specified user and job
identifier.
+ *
+ * @param user The user responsible for triggering the job operation.
+ * @param identifier The identifier of the job involved in the operation.
+ */
+ protected JobEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+
+ @Override
+ public OperationStatus operationStatus() {
+ return OperationStatus.SUCCESS;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobFailureEvent.java
new file mode 100644
index 0000000000..458ede729e
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobFailureEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+
+/**
+ * Represents an event triggered when an attempt to perform a job operation
fails due to an
+ * exception.
+ */
+@DeveloperApi
+public abstract class JobFailureEvent extends FailureEvent {
+
+ /**
+ * Constructs a new {@code JobFailureEvent} instance.
+ *
+ * @param user The user who initiated the job operation.
+ * @param identifier The identifier of the job involved in the operation.
+ * @param exception The exception encountered during the job operation,
providing insights into
+ * the reasons behind the failure.
+ */
+ protected JobFailureEvent(String user, NameIdentifier identifier, Exception
exception) {
+ super(user, identifier, exception);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobPreEvent.java
new file mode 100644
index 0000000000..658cb5abfd
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/JobPreEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.PreEvent;
+
+/** Represents a pre-event for job operations. */
+@DeveloperApi
+public abstract class JobPreEvent extends PreEvent {
+
+ /**
+ * Constructs a JobPreEvent.
+ *
+ * @param user the user associated with the event
+ * @param identifier the name identifier related to the job
+ */
+ protected JobPreEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsEvent.java
new file mode 100644
index 0000000000..625987a678
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+
+/** Represents an event that is triggered upon the successful listing of jobs.
*/
+@DeveloperApi
+public final class ListJobsEvent extends JobEvent {
+
+ private final Optional<String> jobTemplateName;
+
+ /**
+ * Constructs an instance of {@code ListJobsEvent}.
+ *
+ * @param user The username of the individual who initiated the job listing.
+ * @param metalake The namespace from which jobs were listed.
+ * @param jobTemplateName An optional job template name used to filter the
listed jobs.
+ */
+ public ListJobsEvent(String user, String metalake, Optional<String>
jobTemplateName) {
+ super(user, NameIdentifier.of(metalake));
+ this.jobTemplateName = jobTemplateName;
+ }
+
+ /**
+ * Returns the optional job template name used to filter the listed jobs.
+ *
+ * @return an {@code Optional} containing the job template name if provided,
otherwise an empty
+ * {@code Optional}.
+ */
+ public Optional<String> jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.LIST_JOBS;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsFailureEvent.java
new file mode 100644
index 0000000000..6de23445c4
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsFailureEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+
+/** Represents an event triggered when an attempt to list jobs fails due to an
exception. */
+@DeveloperApi
+public class ListJobsFailureEvent extends JobFailureEvent {
+
+ private final Optional<String> jobTemplateName;
+
+ /**
+ * Constructs a new {@code ListJobsFailureEvent} instance.
+ *
+ * @param user The user who initiated the job listing operation.
+ * @param metalake The metalake name where the jobs are being listed.
+ * @param jobTemplateName The optional name of the job template used to
filter the listed jobs.
+ * @param exception The exception encountered during the job listing
operation, providing insights
+ * into the reasons behind the failure.
+ */
+ public ListJobsFailureEvent(
+ String user, String metalake, Optional<String> jobTemplateName,
Exception exception) {
+ super(user, NameIdentifier.of(metalake), exception);
+ this.jobTemplateName = jobTemplateName;
+ }
+
+ /**
+ * Returns the optional job template name used to filter the listed jobs.
+ *
+ * @return an {@code Optional} containing the job template name if provided,
otherwise an empty
+ * {@code Optional}.
+ */
+ public Optional<String> jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.LIST_JOBS;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsPreEvent.java
new file mode 100644
index 0000000000..d2d1539dba
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/ListJobsPreEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+
+/** Represents an event triggered before listing jobs. */
+@DeveloperApi
+public class ListJobsPreEvent extends JobPreEvent {
+ private final Optional<String> jobTemplateName;
+
+ /**
+ * Constructs a new {@code ListJobsPreEvent} instance.
+ *
+ * @param user The user who initiated the job listing operation.
+ * @param metalake The metalake name where the jobs are being listed.
+ * @param jobTemplateName The optional name of the job template used to
filter the listed jobs.
+ */
+ public ListJobsPreEvent(String user, String metalake, Optional<String>
jobTemplateName) {
+ super(user, NameIdentifier.of(metalake));
+ this.jobTemplateName = jobTemplateName;
+ }
+
+ /**
+ * Returns the optional job template name used to filter the listed jobs.
+ *
+ * @return an {@code Optional} containing the job template name if provided,
otherwise an empty
+ * {@code Optional}.
+ */
+ public Optional<String> jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.LIST_JOBS;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobEvent.java
new file mode 100644
index 0000000000..d294eee9ab
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobEvent.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.listener.api.info.JobInfo;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered when a job has been successfully
started/run. */
+@DeveloperApi
+public class RunJobEvent extends JobEvent {
+
+ private final JobInfo jobInfo;
+ private final String jobTemplateName;
+ private final Map<String, String> jobConf;
+
+ /**
+ * Constructs a new {@code RunJobEvent} instance.
+ *
+ * @param user The user who initiated the job run operation.
+ * @param metalake The metalake name where the job runs.
+ * @param jobTemplateName The name of the job template used to run the job.
+ * @param jobConf The runtime configuration for the job.
+ * @param jobInfo The information of the job that has been started.
+ */
+ public RunJobEvent(
+ String user,
+ String metalake,
+ String jobTemplateName,
+ Map<String, String> jobConf,
+ JobInfo jobInfo) {
+ super(user, NameIdentifierUtil.ofJob(metalake, jobInfo.jobId()));
+ this.jobTemplateName = jobTemplateName;
+ this.jobConf = ImmutableMap.copyOf(jobConf);
+ this.jobInfo = jobInfo;
+ }
+
+ /**
+ * Returns the information of the job that has been started.
+ *
+ * @return the job information.
+ */
+ public JobInfo runJobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * Returns the name of the job template used to run the job.
+ *
+ * @return the job template name.
+ */
+ public String jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the runtime configuration for the job.
+ *
+ * @return the job configuration.
+ */
+ public Map<String, String> jobConf() {
+ return jobConf;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.RUN_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobFailureEvent.java
new file mode 100644
index 0000000000..696d802d44
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobFailureEvent.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered when running a job has failed. */
+@DeveloperApi
+public class RunJobFailureEvent extends JobFailureEvent {
+
+ private final String jobTemplateName;
+ private final Map<String, String> jobConf;
+
+ /**
+ * Constructs a new {@code RunJobFailureEvent} instance.
+ *
+ * @param user The user who initiated the job run operation.
+ * @param metalake The metalake name where the job was attempted to run.
+ * @param jobTemplateName The name of the job template that was used for
running the job.
+ * @param jobConf The runtime configuration for the job.
+ * @param exception The exception encountered during the job run operation,
providing insights
+ * into the reasons behind the failure.
+ */
+ public RunJobFailureEvent(
+ String user,
+ String metalake,
+ String jobTemplateName,
+ Map<String, String> jobConf,
+ Exception exception) {
+ super(user, NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateName),
exception);
+ this.jobTemplateName = jobTemplateName;
+ this.jobConf = jobConf;
+ }
+
+ /**
+ * Returns the name of the job template that was used for running the job.
+ *
+ * @return the job template name.
+ */
+ public String jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the runtime configuration for the job.
+ *
+ * @return the job configuration.
+ */
+ public Map<String, String> jobConf() {
+ return jobConf;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.RUN_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobPreEvent.java
new file mode 100644
index 0000000000..eebad84e15
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/job/RunJobPreEvent.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event.job;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.event.OperationType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/** Represents an event triggered before running a job. */
+@DeveloperApi
+public class RunJobPreEvent extends JobPreEvent {
+
+ private final String jobTemplateName;
+ private final Map<String, String> jobConf;
+
+ /**
+ * Constructs a new {@code RunJobPreEvent} instance.
+ *
+ * @param user The user who initiated the job run operation.
+ * @param metalake The metalake name where the job will run.
+ * @param jobTemplateName The name of the job template to be used for
running the job.
+ * @param jobConf The runtime configuration for the job.
+ */
+ public RunJobPreEvent(
+ String user, String metalake, String jobTemplateName, Map<String,
String> jobConf) {
+ super(user, NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateName));
+ this.jobTemplateName = jobTemplateName;
+ this.jobConf = ImmutableMap.copyOf(jobConf);
+ }
+
+ /**
+ * Returns the name of the job template to be used for running the job.
+ *
+ * @return the job template name.
+ */
+ public String jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the runtime configuration for the job.
+ *
+ * @return the job configuration.
+ */
+ public Map<String, String> jobConf() {
+ return jobConf;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return the operation type.
+ */
+ @Override
+ public OperationType operationType() {
+ return OperationType.RUN_JOB;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/info/JobInfo.java
b/core/src/main/java/org/apache/gravitino/listener/api/info/JobInfo.java
new file mode 100644
index 0000000000..156789b13b
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/listener/api/info/JobInfo.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.info;
+
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.meta.JobEntity;
+
+/**
+ * Represents information about a job, including its ID, template name,
status, and audit details.
+ */
+@DeveloperApi
+public final class JobInfo {
+
+ private final String jobId;
+
+ private final String jobTemplateName;
+
+ private final JobHandle.Status jobStatus;
+
+ private final Audit audit;
+
+ private JobInfo(String jobId, String jobTemplateName, JobHandle.Status
jobStatus, Audit audit) {
+ this.jobId = jobId;
+ this.jobTemplateName = jobTemplateName;
+ this.jobStatus = jobStatus;
+ this.audit = audit;
+ }
+
+ /**
+ * Creates a JobInfo instance from a JobEntity.
+ *
+ * @param jobEntity the JobEntity to convert
+ * @return a JobInfo instance containing information from the JobEntity
+ */
+ public static JobInfo fromJobEntity(JobEntity jobEntity) {
+ return new JobInfo(
+ jobEntity.name(), jobEntity.jobTemplateName(), jobEntity.status(),
jobEntity.auditInfo());
+ }
+
+ /**
+ * Returns the unique identifier of the job.
+ *
+ * @return the job ID
+ */
+ public String jobId() {
+ return jobId;
+ }
+
+ /**
+ * Returns the name of the job template associated with the job.
+ *
+ * @return the job template name
+ */
+ public String jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ /**
+ * Returns the current status of the job.
+ *
+ * @return the job status
+ */
+ public JobHandle.Status jobStatus() {
+ return jobStatus;
+ }
+
+ /**
+ * Returns the audit information associated with the job.
+ *
+ * @return the audit information
+ */
+ public Audit auditInfo() {
+ return audit;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestJobEventDispatcher.java
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestJobEventDispatcher.java
index 3133b1412b..19f1393d95 100644
---
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestJobEventDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestJobEventDispatcher.java
@@ -24,11 +24,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
+import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchJobException;
import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.job.JobHandle;
import org.apache.gravitino.job.JobOperationDispatcher;
import org.apache.gravitino.job.JobTemplate;
import org.apache.gravitino.job.JobTemplateChange;
@@ -38,19 +41,33 @@ import org.apache.gravitino.listener.JobEventDispatcher;
import org.apache.gravitino.listener.api.event.job.AlterJobTemplateEvent;
import
org.apache.gravitino.listener.api.event.job.AlterJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.AlterJobTemplatePreEvent;
+import org.apache.gravitino.listener.api.event.job.CancelJobEvent;
+import org.apache.gravitino.listener.api.event.job.CancelJobFailureEvent;
+import org.apache.gravitino.listener.api.event.job.CancelJobPreEvent;
import org.apache.gravitino.listener.api.event.job.DeleteJobTemplateEvent;
import
org.apache.gravitino.listener.api.event.job.DeleteJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.DeleteJobTemplatePreEvent;
+import org.apache.gravitino.listener.api.event.job.GetJobEvent;
+import org.apache.gravitino.listener.api.event.job.GetJobFailureEvent;
+import org.apache.gravitino.listener.api.event.job.GetJobPreEvent;
import org.apache.gravitino.listener.api.event.job.GetJobTemplateEvent;
import org.apache.gravitino.listener.api.event.job.GetJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.GetJobTemplatePreEvent;
import org.apache.gravitino.listener.api.event.job.ListJobTemplatesEvent;
import
org.apache.gravitino.listener.api.event.job.ListJobTemplatesFailureEvent;
import org.apache.gravitino.listener.api.event.job.ListJobTemplatesPreEvent;
+import org.apache.gravitino.listener.api.event.job.ListJobsEvent;
+import org.apache.gravitino.listener.api.event.job.ListJobsFailureEvent;
+import org.apache.gravitino.listener.api.event.job.ListJobsPreEvent;
import org.apache.gravitino.listener.api.event.job.RegisterJobTemplateEvent;
import
org.apache.gravitino.listener.api.event.job.RegisterJobTemplateFailureEvent;
import org.apache.gravitino.listener.api.event.job.RegisterJobTemplatePreEvent;
+import org.apache.gravitino.listener.api.event.job.RunJobEvent;
+import org.apache.gravitino.listener.api.event.job.RunJobFailureEvent;
+import org.apache.gravitino.listener.api.event.job.RunJobPreEvent;
+import org.apache.gravitino.listener.api.info.JobInfo;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.junit.jupiter.api.Assertions;
@@ -65,10 +82,16 @@ public class TestJobEventDispatcher {
private JobEventDispatcher dispatcher;
private DummyEventListener dummyEventListener;
private JobTemplateEntity jobTemplateEntity;
+ private JobEntity jobEntity;
+ private JobInfo jobInfo;
+ private Map<String, String> jobConf;
@BeforeAll
void init() {
this.jobTemplateEntity = mockJobTemplateEntity();
+ this.jobEntity = mockJobEntity();
+ this.jobInfo = mockJobInfo();
+ this.jobConf = Collections.singletonMap("key", "value");
this.dummyEventListener = new DummyEventListener();
EventBus eventBus = new
EventBus(Collections.singletonList(dummyEventListener));
JobOperationDispatcher jobExceptionDispatcher =
mockExceptionJobDispatcher();
@@ -281,26 +304,161 @@ public class TestJobEventDispatcher {
Assertions.assertEquals(OperationStatus.FAILURE, event.operationStatus());
}
- private JobTemplateEntity mockJobTemplateEntity() {
- JobTemplateEntity entity = mock(JobTemplateEntity.class);
- when(entity.name()).thenReturn("testJobTemplate");
- when(entity.comment()).thenReturn("test comment");
- when(entity.auditInfo()).thenReturn(mock(AuditInfo.class));
+ // Job-related event tests
+ @Test
+ void testListJobsEvent() {
+ dispatcher.listJobs("metalake", Optional.empty());
+ PreEvent preEvent = dummyEventListener.popPreEvent();
- JobTemplate jobTemplate = mock(JobTemplate.class);
- when(jobTemplate.name()).thenReturn("testJobTemplate");
- when(jobTemplate.comment()).thenReturn("test comment");
- when(entity.toJobTemplate()).thenReturn(jobTemplate);
+ Assertions.assertEquals("metalake",
Objects.requireNonNull(preEvent.identifier()).toString());
+ Assertions.assertInstanceOf(ListJobsPreEvent.class, preEvent);
+ Assertions.assertEquals(OperationType.LIST_JOBS, preEvent.operationType());
+ Assertions.assertEquals(OperationStatus.UNPROCESSED,
preEvent.operationStatus());
- return entity;
+ Event postEvent = dummyEventListener.popPostEvent();
+ Assertions.assertEquals("metalake",
Objects.requireNonNull(postEvent.identifier()).toString());
+ Assertions.assertInstanceOf(ListJobsEvent.class, postEvent);
+ Assertions.assertEquals(OperationType.LIST_JOBS,
postEvent.operationType());
+ Assertions.assertEquals(OperationStatus.SUCCESS,
postEvent.operationStatus());
}
- private JobOperationDispatcher mockExceptionJobDispatcher() {
- return mock(
- JobOperationDispatcher.class,
- invocation -> {
- throw new GravitinoRuntimeException("Exception for all methods");
- });
+ @Test
+ void testGetJobEvent() {
+ dispatcher.getJob("metalake", jobInfo.jobId());
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+
+ Assertions.assertEquals(
+ NameIdentifierUtil.ofJob("metalake", jobInfo.jobId()),
+ Objects.requireNonNull(preEvent.identifier()));
+ Assertions.assertInstanceOf(GetJobPreEvent.class, preEvent);
+ Assertions.assertEquals(OperationType.GET_JOB, preEvent.operationType());
+ Assertions.assertEquals(OperationStatus.UNPROCESSED,
preEvent.operationStatus());
+
+ Event postEvent = dummyEventListener.popPostEvent();
+ Assertions.assertEquals(
+ NameIdentifierUtil.ofJob("metalake", jobInfo.jobId()),
+ Objects.requireNonNull(postEvent.identifier()));
+ Assertions.assertInstanceOf(GetJobEvent.class, postEvent);
+ Assertions.assertEquals(OperationType.GET_JOB, postEvent.operationType());
+ Assertions.assertEquals(OperationStatus.SUCCESS,
postEvent.operationStatus());
+
+ JobInfo loadedJobInfo = ((GetJobEvent) postEvent).loadedJobInfo();
+ checkJobInfo(loadedJobInfo, jobInfo);
+ }
+
+ @Test
+ void testRunJobEvent() {
+ dispatcher.runJob("metalake", jobTemplateEntity.name(), jobConf);
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+
+ Assertions.assertEquals(
+ NameIdentifierUtil.ofJobTemplate("metalake", jobTemplateEntity.name()),
+ Objects.requireNonNull(preEvent.identifier()));
+ Assertions.assertInstanceOf(RunJobPreEvent.class, preEvent);
+ Assertions.assertEquals(OperationType.RUN_JOB, preEvent.operationType());
+ Assertions.assertEquals(OperationStatus.UNPROCESSED,
preEvent.operationStatus());
+
+ String templateName = ((RunJobPreEvent) preEvent).jobTemplateName();
+ Assertions.assertEquals(jobTemplateEntity.name(), templateName);
+ Map<String, String> preEventJobConf = ((RunJobPreEvent)
preEvent).jobConf();
+ Assertions.assertEquals(jobConf, preEventJobConf);
+
+ Event postEvent = dummyEventListener.popPostEvent();
+ Assertions.assertEquals(
+ NameIdentifierUtil.ofJob("metalake", jobInfo.jobId()),
+ Objects.requireNonNull(postEvent.identifier()));
+ Assertions.assertInstanceOf(RunJobEvent.class, postEvent);
+ Assertions.assertEquals(OperationType.RUN_JOB, postEvent.operationType());
+ Assertions.assertEquals(OperationStatus.SUCCESS,
postEvent.operationStatus());
+
+ JobInfo runJobInfo = ((RunJobEvent) postEvent).runJobInfo();
+ checkJobInfo(runJobInfo, jobInfo);
+ String postTemplateName = ((RunJobEvent) postEvent).jobTemplateName();
+ Assertions.assertEquals(jobTemplateEntity.name(), postTemplateName);
+ Map<String, String> postEventJobConf = ((RunJobEvent) postEvent).jobConf();
+ Assertions.assertEquals(jobConf, postEventJobConf);
+ }
+
+ @Test
+ void testCancelJobEvent() {
+ dispatcher.cancelJob("metalake", jobInfo.jobId());
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+
+ Assertions.assertEquals(
+ NameIdentifierUtil.ofJob("metalake", jobInfo.jobId()),
+ Objects.requireNonNull(preEvent.identifier()));
+ Assertions.assertInstanceOf(CancelJobPreEvent.class, preEvent);
+ Assertions.assertEquals(OperationType.CANCEL_JOB,
preEvent.operationType());
+ Assertions.assertEquals(OperationStatus.UNPROCESSED,
preEvent.operationStatus());
+
+ Event postEvent = dummyEventListener.popPostEvent();
+ Assertions.assertEquals(
+ NameIdentifierUtil.ofJob("metalake", jobInfo.jobId()),
+ Objects.requireNonNull(postEvent.identifier()));
+ Assertions.assertInstanceOf(CancelJobEvent.class, postEvent);
+ Assertions.assertEquals(OperationType.CANCEL_JOB,
postEvent.operationType());
+ Assertions.assertEquals(OperationStatus.SUCCESS,
postEvent.operationStatus());
+
+ JobInfo cancelledJobInfo = ((CancelJobEvent) postEvent).cancelledJobInfo();
+ checkJobInfo(cancelledJobInfo, jobInfo);
+ }
+
+ // Job-related failure event tests
+ @Test
+ void testListJobsFailureEvent() {
+ Assertions.assertThrowsExactly(
+ GravitinoRuntimeException.class,
+ () -> failureDispatcher.listJobs("metalake", Optional.empty()));
+ Event event = dummyEventListener.popPostEvent();
+ Assertions.assertInstanceOf(ListJobsFailureEvent.class, event);
+ Assertions.assertEquals(
+ GravitinoRuntimeException.class, ((ListJobsFailureEvent)
event).exception().getClass());
+ Assertions.assertEquals(OperationType.LIST_JOBS, event.operationType());
+ Assertions.assertEquals(OperationStatus.FAILURE, event.operationStatus());
+ }
+
+ @Test
+ void testGetJobFailureEvent() {
+ Assertions.assertThrowsExactly(
+ GravitinoRuntimeException.class,
+ () -> failureDispatcher.getJob("metalake", jobInfo.jobId()));
+ Event event = dummyEventListener.popPostEvent();
+ Assertions.assertInstanceOf(GetJobFailureEvent.class, event);
+ Assertions.assertEquals(
+ GravitinoRuntimeException.class, ((GetJobFailureEvent)
event).exception().getClass());
+ Assertions.assertEquals(OperationType.GET_JOB, event.operationType());
+ Assertions.assertEquals(OperationStatus.FAILURE, event.operationStatus());
+ }
+
+ @Test
+ void testRunJobFailureEvent() {
+ Assertions.assertThrowsExactly(
+ GravitinoRuntimeException.class,
+ () -> failureDispatcher.runJob("metalake", jobTemplateEntity.name(),
jobConf));
+ Event event = dummyEventListener.popPostEvent();
+ Assertions.assertInstanceOf(RunJobFailureEvent.class, event);
+ Assertions.assertEquals(
+ GravitinoRuntimeException.class, ((RunJobFailureEvent)
event).exception().getClass());
+ Assertions.assertEquals(OperationType.RUN_JOB, event.operationType());
+ Assertions.assertEquals(OperationStatus.FAILURE, event.operationStatus());
+
+ String templateName = ((RunJobFailureEvent) event).jobTemplateName();
+ Assertions.assertEquals(jobTemplateEntity.name(), templateName);
+ Map<String, String> eventJobConf = ((RunJobFailureEvent) event).jobConf();
+ Assertions.assertEquals(jobConf, eventJobConf);
+ }
+
+ @Test
+ void testCancelJobFailureEvent() {
+ Assertions.assertThrowsExactly(
+ GravitinoRuntimeException.class,
+ () -> failureDispatcher.cancelJob("metalake", jobInfo.jobId()));
+ Event event = dummyEventListener.popPostEvent();
+ Assertions.assertInstanceOf(CancelJobFailureEvent.class, event);
+ Assertions.assertEquals(
+ GravitinoRuntimeException.class, ((CancelJobFailureEvent)
event).exception().getClass());
+ Assertions.assertEquals(OperationType.CANCEL_JOB, event.operationType());
+ Assertions.assertEquals(OperationStatus.FAILURE, event.operationStatus());
}
private void checkJobTemplate(JobTemplate actual, JobTemplateEntity
expected) {
@@ -308,11 +466,18 @@ public class TestJobEventDispatcher {
Assertions.assertEquals(expected.comment(), actual.comment());
}
+ private void checkJobInfo(JobInfo actual, JobInfo expected) {
+ Assertions.assertEquals(expected.jobId(), actual.jobId());
+ Assertions.assertEquals(expected.jobTemplateName(),
actual.jobTemplateName());
+ Assertions.assertEquals(expected.jobStatus(), actual.jobStatus());
+ }
+
private JobOperationDispatcher mockJobDispatcher() {
JobOperationDispatcher dispatcher = mock(JobOperationDispatcher.class);
String metalake = "metalake";
try {
+ // Job template operations
when(dispatcher.listJobTemplates(metalake))
.thenReturn(Collections.singletonList(jobTemplateEntity));
when(dispatcher.getJobTemplate(any(String.class), any(String.class)))
@@ -321,6 +486,14 @@ public class TestJobEventDispatcher {
when(dispatcher.alterJobTemplate(
any(String.class), any(String.class),
any(JobTemplateChange[].class)))
.thenReturn(jobTemplateEntity);
+
+ // Job operations
+ when(dispatcher.listJobs(any(String.class), any(Optional.class)))
+ .thenReturn(Collections.singletonList(jobEntity));
+ when(dispatcher.getJob(any(String.class),
any(String.class))).thenReturn(jobEntity);
+ when(dispatcher.runJob(any(String.class), any(String.class),
any(Map.class)))
+ .thenReturn(jobEntity);
+ when(dispatcher.cancelJob(any(String.class),
any(String.class))).thenReturn(jobEntity);
} catch (JobTemplateAlreadyExistsException
| NoSuchJobTemplateException
| NoSuchJobException e) {
@@ -329,4 +502,44 @@ public class TestJobEventDispatcher {
return dispatcher;
}
+
+ private JobTemplateEntity mockJobTemplateEntity() {
+ JobTemplateEntity entity = mock(JobTemplateEntity.class);
+ when(entity.name()).thenReturn("testJobTemplate");
+ when(entity.comment()).thenReturn("test comment");
+ when(entity.auditInfo()).thenReturn(mock(AuditInfo.class));
+
+ JobTemplate jobTemplate = mock(JobTemplate.class);
+ when(jobTemplate.name()).thenReturn("testJobTemplate");
+ when(jobTemplate.comment()).thenReturn("test comment");
+ when(entity.toJobTemplate()).thenReturn(jobTemplate);
+
+ return entity;
+ }
+
+ private JobOperationDispatcher mockExceptionJobDispatcher() {
+ return mock(
+ JobOperationDispatcher.class,
+ invocation -> {
+ throw new GravitinoRuntimeException("Exception for all methods");
+ });
+ }
+
+ private JobEntity mockJobEntity() {
+ JobEntity entity = mock(JobEntity.class);
+ when(entity.jobTemplateName()).thenReturn("testJob");
+ when(entity.name()).thenReturn("job-12345");
+ when(entity.auditInfo()).thenReturn(mock(AuditInfo.class));
+ when(entity.status()).thenReturn(JobHandle.Status.SUCCEEDED);
+
+ return entity;
+ }
+
+ private JobInfo mockJobInfo() {
+ JobInfo info = mock(JobInfo.class);
+ when(info.jobId()).thenReturn("job-12345");
+ when(info.jobTemplateName()).thenReturn("testJob");
+ when(info.jobStatus()).thenReturn(JobHandle.Status.SUCCEEDED);
+ return info;
+ }
}
diff --git a/docs/gravitino-server-config.md b/docs/gravitino-server-config.md
index b3aaec9fca..3da93fda7d 100644
--- a/docs/gravitino-server-config.md
+++ b/docs/gravitino-server-config.md
@@ -188,6 +188,7 @@ Gravitino triggers a pre-event before the operation, a
post-event after the comp
| role operation | `CreateRoleEvent`,
`DeleteRoleEvent`, `GetRoleEvent`, `GrantPrivilegesEvent`,
`ListRoleNamesEvent`, `RevokePrivilegesEvent`, `CreateRoleFailureEvent`,
`DeleteRoleFailureEvent`, `GetRoleFailureEvent`, `GrantPrivilegesFailureEvent`,
`ListRoleNamesFailureEvent`, `RevokePrivilegesFailureEvent`
[...]
| owner operation | `SetOwnerEvent`, `GetOwnerEvent`
[...]
| Gravitino server job template operation | `RegisterJobTemplateEvent`,
`GetJobTemplateEvent`, `ListJobTemplatesEvent`, `AlterJobTemplateEvent`,
`DeleteJobTemplateEvent`, `RegisterJobTemplateFailureEvent`,
`GetJobTemplateFailureEvent`, `ListJobTemplatesFailureEvent`,
`AlterJobTemplateFailureEvent`, `DeleteJobTemplateFailureEvent`
[...]
+| Gravitino server job operation | `RunJobEvent`, `GetJobEvent`,
`ListJobsEvent`, `CancelJobEvent`, `RunJobFailureEvent`, `GetJobFailureEvent`,
`ListJobsFailureEvent`, `CancelJobFailureEvent`
[...]
##### Pre-event
@@ -207,6 +208,7 @@ Gravitino triggers a pre-event before the operation, a
post-event after the comp
| Gravitino server role operation | `CreateRolePreEvent`,
`DeleteRolePreEvent`, `GetRolePreEvent`, `GrantPrivilegesPreEvent`,
`ListRoleNamesPreEvent`, `RevokePrivilegesPreEvent`
|
0.9.0-incubating |
| Gravitino server owner operation | `SetOwnerPreEvent`,
`GetOwnerPreEvent`
| 1.0.0 |
| Gravitino server job template operation | `RegisterJobTemplatePreEvent`,
`GetJobTemplatePreEvent`, `ListJobTemplatesPreEvent`,
`AlterJobTemplatePreEvent`, `DeleteJobTemplatePreEvent`
| 1.0.1
|
+| Gravitino server job operation | `RunJobPreEvent`,
`GetJobPreEvent`, `ListJobsPreEvent`, `CancelJobPreEvent`
| 1.0.1 |
#### Event listener plugin