OOZIE-2509 SLA job status can stuck in running state
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ba7a7b85 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ba7a7b85 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ba7a7b85 Branch: refs/heads/master Commit: ba7a7b85e040a313fa107474768dd67a325f91d5 Parents: 5fbd3eb Author: Purshotam Shah <[email protected]> Authored: Tue May 17 15:08:35 2016 -0700 Committer: Purshotam Shah <[email protected]> Committed: Tue May 17 15:08:35 2016 -0700 ---------------------------------------------------------------------- .../command/coord/CoordActionCheckXCommand.java | 6 +- .../sla/SLACoordActionJobEventXCommand.java | 80 ++ .../sla/SLACoordActionJobHistoryXCommand.java | 78 ++ .../oozie/command/sla/SLAJobEventXCommand.java | 301 ++++++ .../command/sla/SLAJobHistoryXCommand.java | 127 +++ .../sla/SLAWorkflowActionJobEventXCommand.java | 62 ++ .../SLAWorkflowActionJobHistoryXCommand.java | 57 ++ .../sla/SLAWorkflowJobEventXCommand.java | 64 ++ .../sla/SLAWorkflowJobHistoryXCommand.java | 56 ++ .../jpa/CoordActionGetForSLAJPAExecutor.java | 82 -- .../executor/jpa/CoordActionQueryExecutor.java | 13 +- .../executor/jpa/SLASummaryQueryExecutor.java | 9 - .../jpa/WorkflowActionGetForSLAJPAExecutor.java | 78 -- .../jpa/WorkflowActionQueryExecutor.java | 13 +- .../jpa/WorkflowJobGetForSLAJPAExecutor.java | 78 -- .../executor/jpa/WorkflowJobQueryExecutor.java | 13 +- .../oozie/service/ConfigurationService.java | 12 +- .../org/apache/oozie/sla/SLACalcStatus.java | 54 - .../apache/oozie/sla/SLACalculatorMemory.java | 989 +++---------------- .../org/apache/oozie/sla/SLASummaryBean.java | 2 - .../apache/oozie/sla/SLAXCommandFactory.java | 92 ++ .../coord/TestCoordActionsKillXCommand.java | 11 +- .../jpa/TestSLASummaryQueryExecutor.java | 2 +- .../apache/oozie/service/TestHASLAService.java | 64 +- .../oozie/sla/TestSLACalculatorMemory.java | 344 +++++-- .../oozie/sla/TestSLAEventGeneration.java | 1 + .../oozie/sla/TestSLAJobEventListener.java | 136 ++- .../org/apache/oozie/sla/TestSLAService.java | 123 ++- release-log.txt | 1 + 29 files changed, 1668 insertions(+), 1280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java index 128feb2..bdbbd24 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java @@ -47,9 +47,10 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; /** * The command checks workflow status for coordinator action. @@ -177,7 +178,8 @@ public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor( coordAction.getJobId())); - workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId())); + workflowJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, + coordAction.getExternalId()); LogUtils.setLogInfo(coordAction); } else { http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java new file mode 100644 index 0000000..dfe2637 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.sla.SLACalcStatus; +import org.apache.oozie.util.LogUtils; + +public class SLACoordActionJobEventXCommand extends SLAJobEventXCommand { + CoordinatorActionBean ca; + WorkflowJobBean wf; + + public SLACoordActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) { + super(slaCalc, lockTimeOut); + } + + @Override + protected void loadState() throws CommandException { + try { + ca = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, slaCalc.getId()); + if (ca.getExternalId() != null) { + wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, ca.getExternalId()); + } + LogUtils.setLogInfo(ca); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + } + + + protected void updateJobInfo() { + if (ca.isTerminalStatus()) { + setEnded(true); + setEndMiss(ca.isTerminalWithFailure()); + slaCalc.setActualEnd(ca.getLastModifiedTime()); + if (wf != null) { + if (wf.getEndTime() != null) { + if (slaCalc.getExpectedEnd() != null + && wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { + setEndMiss(true); + } + slaCalc.setActualEnd(wf.getEndTime()); + } + slaCalc.setActualStart(wf.getStartTime()); + } + } + else { + if (wf != null) { + slaCalc.setActualStart(wf.getStartTime()); + } + } + slaCalc.setJobStatus(ca.getStatusStr()); + } + + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java new file mode 100644 index 0000000..b7f09d3 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.util.LogUtils; + +public class SLACoordActionJobHistoryXCommand extends SLAJobHistoryXCommand { + + CoordinatorActionBean cAction = null; + + public SLACoordActionJobHistoryXCommand(String jobId) { + super(jobId); + } + + + protected void loadState() throws CommandException { + try { + cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, jobId); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + LogUtils.setLogInfo(cAction); + } + + protected void updateSLASummary() throws CommandException { + try { + updateSLASummaryForCoordAction(cAction); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + + } + + protected void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException { + String wrkflowId = bean.getExternalId(); + if (wrkflowId != null) { + WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get( + WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId); + if (wrkflow != null) { + updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime(), bean.getStatusStr()); + } + } + else{ + updateSLASummary(bean.getId(), null, bean.getLastModifiedTime(), bean.getStatusStr()); + } + } + + @Override + protected boolean isJobEnded() { + return cAction.isTerminalStatus(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java new file mode 100644 index 0000000..9b18606 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import java.util.Date; + +import org.apache.oozie.XException; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.event.SLAEvent.EventStatus; +import org.apache.oozie.client.event.SLAEvent.SLAStatus; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.XCommand; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; +import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; +import org.apache.oozie.service.EventHandlerService; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLACalcStatus; +import org.apache.oozie.sla.SLASummaryBean; + +public abstract class SLAJobEventXCommand extends XCommand<Void> { + private long lockTimeOut = 0 ; + JPAService jpaService = Services.get().get(JPAService.class); + SLACalcStatus slaCalc; + final static String SLA_LOCK_PREFIX = "sla_"; + private boolean isEnded = false; + private boolean isEndMiss = false; + + public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) { + super("SLA.job.event", "SLA.job.event", 1); + this.slaCalc = slaCalc; + this.lockTimeOut = lockTimeOut; + } + + @Override + protected boolean isLockRequired() { + return true; + } + + @Override + protected boolean isReQueueRequired() { + return false; + } + + @Override + public String getEntityKey() { + return SLA_LOCK_PREFIX + slaCalc.getId(); + } + + protected long getLockTimeOut() { + return lockTimeOut; + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + + @Override + protected Void execute() throws CommandException { + updateJobInfo(); + if (isEnded) { + processForEnd(); + } + else { + processForRunning(); + } + try { + writeToDB(); + } + catch (XException e) { + throw new CommandException(e); + } + return null; + } + + /** + * Verify job. + */ + protected abstract void updateJobInfo(); + + /** + * Should alert. + * + * @param slaObj the sla obj + * @return true, if successful + */ + private boolean shouldAlert(SLACalcStatus slaObj) { + return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT); + } + + /** + * Queue event. + * + * @param event the event + */ + private void queueEvent(SLACalcStatus event) { + Services.get().get(EventHandlerService.class).queueEvent(event); + } + + /** + * Process duration sla. + * + * @param expected the expected + * @param actual the actual + * @param slaCalc the sla calc + */ + private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) { + if (expected != -1) { + if (actual > expected) { + slaCalc.setEventStatus(EventStatus.DURATION_MISS); + } + else if (actual <= expected) { + slaCalc.setEventStatus(EventStatus.DURATION_MET); + } + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + } + } + + + /** + * WriteSLA object to DB. + * + * @throws JPAExecutorException the JPA executor exception + */ + private void writeToDB() throws JPAExecutorException { + byte eventProc = slaCalc.getEventProcessed(); + // no more processing, no transfer to history set + if (slaCalc.getEventProcessed() >= 8) { + slaCalc.setEventProcessed(8); + } + + SLASummaryBean slaSummaryBean = new SLASummaryBean(); + slaSummaryBean.setId(slaCalc.getId()); + slaSummaryBean.setEventProcessed(eventProc); + slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); + slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); + slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); + slaSummaryBean.setActualStart(slaCalc.getActualStart()); + slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); + slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); + slaSummaryBean.setLastModifiedTime(new Date()); + + SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, + slaSummaryBean); + + LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(), + slaCalc.getEventProcessed(), slaCalc.getJobStatus()); + + } + + /** + * Process for end. + */ + private void processForEnd() { + byte eventProc = slaCalc.getEventProcessed(); + + LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd()); + if (isEndMiss()) { + slaCalc.setSLAStatus(SLAStatus.MISS); + } + else { + slaCalc.setSLAStatus(SLAStatus.MET); + } + if (eventProc != 8 && slaCalc.getActualStart() != null) { + if ((eventProc & 1) == 0) { + if (slaCalc.getExpectedStart() != null) { + if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { + slaCalc.setEventStatus(EventStatus.START_MISS); + } + else { + slaCalc.setEventStatus(EventStatus.START_MET); + } + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + } + } + slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime()); + if (((eventProc >> 1) & 1) == 0) { + processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc); + } + } + if (eventProc != 8 && eventProc < 4) { + if (isEndMiss()) { + slaCalc.setEventStatus(EventStatus.END_MISS); + } + else { + slaCalc.setEventStatus(EventStatus.END_MET); + } + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + } + slaCalc.setEventProcessed(8); + } + + /** + * Process for running. + */ + private void processForRunning() { + byte eventProc = slaCalc.getEventProcessed(); + + if (eventProc != 8 && slaCalc.getActualStart() != null) { + slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); + } + if (eventProc != 8 && (eventProc & 1) == 0) { + if (slaCalc.getExpectedStart() == null) { + eventProc++; + } + else if (slaCalc.getActualStart() != null) { + if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { + slaCalc.setEventStatus(EventStatus.START_MISS); + } + else { + slaCalc.setEventStatus(EventStatus.START_MET); + } + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + eventProc++; + } + else if (slaCalc.getExpectedStart() != null + && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) { + slaCalc.setEventStatus(EventStatus.START_MISS); + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + eventProc++; + } + + } + if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { + if (slaCalc.getExpectedDuration() == -1) { + eventProc += 2; + } + else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) { + if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) { + slaCalc.setEventStatus(EventStatus.DURATION_MISS); + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + eventProc += 2; + } + } + } + if (eventProc < 4) { + if (slaCalc.getExpectedEnd() != null) { + if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { + slaCalc.setEventStatus(EventStatus.END_MISS); + slaCalc.setSLAStatus(SLAStatus.MISS); + if (shouldAlert(slaCalc)) { + queueEvent(new SLACalcStatus(slaCalc)); + } + eventProc += 4; + } + } + else { + eventProc += 4; + } + } + slaCalc.setEventProcessed(eventProc); + } + + public boolean isEnded() { + return isEnded; + } + + public void setEnded(boolean isEnded) { + this.isEnded = isEnded; + } + + public boolean isEndMiss() { + return isEndMiss; + } + + public void setEndMiss(boolean isEndMiss) { + this.isEndMiss = isEndMiss; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java new file mode 100644 index 0000000..0b4045a --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import java.util.Date; + +import org.apache.oozie.XException; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.XCommand; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; +import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; +import org.apache.oozie.sla.SLASummaryBean; + +public abstract class SLAJobHistoryXCommand extends XCommand<Boolean> { + + protected String jobId; + + public SLAJobHistoryXCommand(String jobId) { + super("SLAJobHistoryXCommand", "SLAJobHistoryXCommand", 1); + this.jobId = jobId; + + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + @Override + protected boolean isLockRequired() { + return true; + } + + @Override + protected boolean isReQueueRequired() { + return false; + } + + @Override + public String getEntityKey() { + return SLAJobEventXCommand.SLA_LOCK_PREFIX + jobId; + } + + protected long getLockTimeOut() { + return 0L; + } + + protected Boolean execute() throws CommandException { + if (isJobEnded()) { + try { + updateSLASummary(); + } + catch (XException e) { + throw new CommandException(e); + } + return true; + } + else { + LOG.debug("Job [{0}] is not finished", jobId); + } + return false; + + } + + /** + * Checks if is job ended. + * + * @return true, if is job ended + */ + protected abstract boolean isJobEnded(); + + /** + * Update SLASummary + * + */ + protected abstract void updateSLASummary() throws CommandException, XException; + + /** + * Update sla summary. + * + * @param id the id + * @param startTime the start time + * @param endTime the end time + * @param status the status + * @throws JPAExecutorException the JPA executor exception + */ + protected void updateSLASummary(String id, Date startTime, Date endTime, String status) throws JPAExecutorException { + SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id); + if (sla.getJobStatus().equals(status) && sla.getEventProcessed() == 8) { + LOG.debug("SLA job is already updated", sla.getId(), sla.getEventProcessed(), sla.getJobStatus()); + return; + } + if (sla != null) { + sla.setActualStart(startTime); + sla.setActualEnd(endTime); + if (startTime != null && endTime != null) { + sla.setActualDuration(endTime.getTime() - startTime.getTime()); + } + sla.setLastModifiedTime(new Date()); + sla.setEventProcessed(8); + sla.setJobStatus(status); + SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, + sla); + LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", sla.getId(), + sla.getEventProcessed(), sla.getJobStatus()); + + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java new file mode 100644 index 0000000..fef77ae --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; +import org.apache.oozie.sla.SLACalcStatus; +import org.apache.oozie.util.LogUtils; + +public class SLAWorkflowActionJobEventXCommand extends SLAJobEventXCommand { + WorkflowActionBean wa; + + public SLAWorkflowActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) { + super(slaCalc, lockTimeOut); + } + + @Override + protected void loadState() throws CommandException { + try { + wa = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_FOR_SLA, slaCalc.getId()); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + LogUtils.setLogInfo(wa); + + } + + + @Override + protected void updateJobInfo() { + if (wa.getEndTime() != null) { + setEnded(true); + if (wa.isTerminalWithFailure() || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { + setEndMiss(true); + } + } + slaCalc.setActualStart(wa.getStartTime()); + slaCalc.setActualEnd(wa.getEndTime()); + slaCalc.setJobStatus(wa.getStatusStr()); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java new file mode 100644 index 0000000..7dc4a3c --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.XException; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.util.LogUtils; + +public class SLAWorkflowActionJobHistoryXCommand extends SLAJobHistoryXCommand { + + WorkflowActionBean wfAction = null; + + public SLAWorkflowActionJobHistoryXCommand(String jobId) { + super(jobId); + } + + protected void loadState() throws CommandException { + + try { + wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED, jobId); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + LogUtils.setLogInfo(wfAction); + } + + protected void updateSLASummary() throws XException { + updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr()); + + } + + @Override + protected boolean isJobEnded() { + return wfAction.isComplete() || wfAction.isTerminalWithFailure(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java new file mode 100644 index 0000000..9a72617 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.sla.SLACalcStatus; +import org.apache.oozie.util.LogUtils; + +public class SLAWorkflowJobEventXCommand extends SLAJobEventXCommand { + WorkflowJobBean wf; + + public SLAWorkflowJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) { + super(slaCalc, lockTimeOut); + } + + @Override + protected void loadState() throws CommandException { + try { + wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, slaCalc.getId()); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + LogUtils.setLogInfo(wf); + + } + + + @Override + protected void updateJobInfo() { + if (wf.inTerminalState()) { + setEnded(true); + if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED + || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { + setEndMiss(true); + } + slaCalc.setActualEnd(wf.getEndTime()); + } + slaCalc.setActualStart(wf.getStartTime()); + slaCalc.setJobStatus(wf.getStatusStr()); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java new file mode 100644 index 0000000..79e45ee --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.sla; + +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.XException; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.util.LogUtils; + +public class SLAWorkflowJobHistoryXCommand extends SLAJobHistoryXCommand { + + WorkflowJobBean wfJob = null; + + public SLAWorkflowJobHistoryXCommand(String jobId) { + super(jobId); + } + + + protected void loadState() throws CommandException { + try { + wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId); + } + catch (JPAExecutorException e) { + throw new CommandException(e); + } + LogUtils.setLogInfo(wfJob); + } + + protected void updateSLASummary() throws XException { + updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr()); + } + + @Override + protected boolean isJobEnded() { + return wfJob.inTerminalState(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java deleted file mode 100644 index 8a5b997..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.ParamChecker; - -/** - * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart - */ -public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> { - - private String coordActionId; - - public CoordActionGetForSLAJPAExecutor(String coordActionId) { - ParamChecker.notNull(coordActionId, "coordActionId"); - this.coordActionId = coordActionId; - } - - @Override - public String getName() { - return "CoordActionGetForSLAJPAExecutor"; - } - - @Override - public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException { - try { - Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA"); - q.setParameter("id", coordActionId); - Object[] obj = (Object[]) q.getSingleResult(); - CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj); - return caBean; - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - - } - - private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) { - CoordinatorActionBean bean = new CoordinatorActionBean(); - if (arr[0] != null) { - bean.setId((String) arr[0]); - } - if (arr[1] != null) { - bean.setJobId((String) arr[1]); - } - if (arr[2] != null) { - bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2])); - } - if (arr[3] != null) { - bean.setExternalId((String) arr[3]); - } - if (arr[4] != null) { - bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4])); - } - return bean; - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index 79ec28c..c0e6c19 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -59,7 +59,8 @@ public class CoordActionQueryExecutor extends GET_TERMINATED_ACTION_IDS_FOR_DATES, GET_ACTIVE_ACTIONS_FOR_DATES, GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, - GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN + GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, + GET_COORD_ACTION_FOR_SLA }; private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); @@ -177,6 +178,7 @@ public class CoordActionQueryExecutor extends switch (caQuery) { case GET_COORD_ACTION: case GET_COORD_ACTION_STATUS: + case GET_COORD_ACTION_FOR_SLA: query.setParameter("id", parameters[0]); break; case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: @@ -330,6 +332,15 @@ public class CoordActionQueryExecutor extends bean.setExternalId((String) arr[3]); bean.setPushMissingDependenciesBlob((StringBlob) arr[4]); break; + case GET_COORD_ACTION_FOR_SLA: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setId((String) arr[0]); + bean.setJobId((String) arr[1]); + bean.setStatusStr((String) arr[2]); + bean.setExternalId((String) arr[3]); + bean.setLastModifiedTime((Timestamp) arr[4]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java index 6663162..6ff9df8 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java @@ -37,7 +37,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu public enum SLASummaryQuery { UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, - UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, UPDATE_SLA_SUMMARY_ALL, UPDATE_SLA_SUMMARY_EVENTPROCESSED, UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, @@ -72,14 +71,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu query.setParameter("actualEndTS", bean.getActualEndTimestamp()); query.setParameter("actualDuration", bean.getActualDuration()); break; - case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES: - query.setParameter("jobId", bean.getId()); - query.setParameter("eventProcessed", bean.getEventProcessed()); - query.setParameter("actualStartTS", bean.getActualStartTimestamp()); - query.setParameter("actualEndTS", bean.getActualEndTimestamp()); - query.setParameter("actualDuration", bean.getActualDuration()); - query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp()); - break; case UPDATE_SLA_SUMMARY_ALL: query.setParameter("appName", bean.getAppName()); query.setParameter("appType", bean.getAppType().toString()); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java deleted file mode 100644 index 280294b..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.ErrorCode; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.ParamChecker; - -/** - * Retrieve the workflow action bean for sla service - */ -public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> { - - private String wfActionId; - - public WorkflowActionGetForSLAJPAExecutor(String wfActionId) { - ParamChecker.notNull(wfActionId, "wfActionId"); - this.wfActionId = wfActionId; - } - - @Override - public String getName() { - return "WorkflowActionGetForSLAJPAExecutor"; - } - - @Override - public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException { - try { - Query q = em.createNamedQuery("GET_ACTION_FOR_SLA"); - q.setParameter("id", wfActionId); - Object[] obj = (Object[]) q.getSingleResult(); - return getBeanFromArray(obj); - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - } - - private WorkflowActionBean getBeanFromArray(Object[] arr) { - WorkflowActionBean wab = new WorkflowActionBean(); - if (arr[0] != null) { - wab.setId((String) arr[0]); - } - if (arr[1] != null) { - wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1])); - } - if (arr[2] != null) { - wab.setStartTime(DateUtils.toDate((Timestamp) arr[2])); - } - if (arr[3] != null) { - wab.setEndTime(DateUtils.toDate((Timestamp) arr[3])); - } - return wab; - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java index 078fd40..f01f090 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java @@ -57,7 +57,8 @@ public class WorkflowActionQueryExecutor extends GET_ACTION_COMPLETED, GET_RUNNING_ACTIONS, GET_PENDING_ACTIONS, - GET_ACTIONS_FOR_WORKFLOW_RERUN + GET_ACTIONS_FOR_WORKFLOW_RERUN, + GET_ACTION_FOR_SLA }; private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor(); @@ -202,6 +203,7 @@ public class WorkflowActionQueryExecutor extends case GET_ACTION_CHECK: case GET_ACTION_END: case GET_ACTION_COMPLETED: + case GET_ACTION_FOR_SLA: query.setParameter("id", parameters[0]); break; case GET_RUNNING_ACTIONS: @@ -363,6 +365,15 @@ public class WorkflowActionQueryExecutor extends bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); bean.setType((String) arr[4]); break; + case GET_ACTION_FOR_SLA: + bean = new WorkflowActionBean(); + arr = (Object[]) ret; + bean.setId((String) arr[0]); + bean.setStatusStr((String) arr[1]); + bean.setStartTime(DateUtils.toDate((Timestamp) arr[2])); + bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java deleted file mode 100644 index 774766f..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.ErrorCode; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.ParamChecker; - -/** - * Retrieve the workflow job bean for sla service - */ -public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> { - - private String wfJobId; - - public WorkflowJobGetForSLAJPAExecutor(String wfJobId) { - ParamChecker.notNull(wfJobId, "wfJobId"); - this.wfJobId = wfJobId; - } - - @Override - public String getName() { - return "WorkflowJobGetForSLAJPAExecutor"; - } - - @Override - public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException { - try { - Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA"); - q.setParameter("id", wfJobId); - Object[] obj = (Object[]) q.getSingleResult(); - return getBeanFromArray(obj); - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - } - - private WorkflowJobBean getBeanFromArray(Object[] arr) { - WorkflowJobBean wjb = new WorkflowJobBean(); - if (arr[0] != null) { - wjb.setId((String) arr[0]); - } - if (arr[1] != null) { - wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1])); - } - if (arr[2] != null) { - wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2])); - } - if (arr[3] != null) { - wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3])); - } - return wjb; - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java index ce108d5..13fa54d 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java @@ -28,7 +28,6 @@ import javax.persistence.Query; import org.apache.oozie.BinaryBlob; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; -import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; @@ -60,7 +59,8 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor GET_WORKFLOW_RESUME, GET_WORKFLOW_STATUS, GET_WORKFLOWS_PARENT_COORD_RERUN, - GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN + GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, + GET_WORKFLOW_FOR_SLA }; private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor(); @@ -171,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor case GET_WORKFLOW_KILL: case GET_WORKFLOW_RESUME: case GET_WORKFLOW_STATUS: + case GET_WORKFLOW_FOR_SLA: query.setParameter("id", parameters[0]); break; case GET_WORKFLOWS_PARENT_COORD_RERUN: @@ -330,6 +331,14 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor bean.setId((String) arr[0]); bean.setParentId((String) arr[1]); break; + case GET_WORKFLOW_FOR_SLA: + bean = new WorkflowJobBean(); + arr = (Object[]) ret; + bean.setId((String) arr[0]); + bean.setStatusStr((String) arr[1]); + bean.setStartTime(DateUtils.toDate((Timestamp) arr[2])); + bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/service/ConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java index 4246764..9d4dcd9 100644 --- a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java +++ b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java @@ -544,12 +544,19 @@ public class ConfigurationService implements Service, Instrumentable { } public static long getLong(String name) { + return getLong(name, ConfigUtils.LONG_DEFAULT); + } + + public static long getLong(String name, long defultValue) { Configuration conf = Services.get().getConf(); - return getLong(conf, name); + return getLong(conf, name, defultValue); } public static long getLong(Configuration conf, String name) { - return conf.getLong(name, ConfigUtils.LONG_DEFAULT); + return getLong(conf, name, ConfigUtils.LONG_DEFAULT); + } + public static long getLong(Configuration conf, String name, long defultValue) { + return conf.getLong(name, defultValue); } public static Class<?>[] getClasses(String name) { @@ -590,4 +597,5 @@ public class ConfigurationService implements Service, Instrumentable { Configuration conf = Services.get().getConf(); return getPassword(conf, name, defaultValue); } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java index 5c0cfd9..3a76dfe 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java @@ -25,11 +25,6 @@ import java.util.Map; import org.apache.oozie.AppType; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.event.SLAEvent; -import org.apache.oozie.lock.LockToken; -import org.apache.oozie.service.JobsConcurrencyService; -import org.apache.oozie.service.MemoryLocksService; -import org.apache.oozie.service.Services; -import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XLog; @@ -49,7 +44,6 @@ public class SLACalcStatus extends SLAEvent { private long actualDuration = -1; private Date lastModifiedTime; private byte eventProcessed; - private LockToken lock; private XLog LOG; @@ -293,53 +287,5 @@ public class SLACalcStatus extends SLAEvent { public String getEntityKey() { return SLA_ENTITYKEY_PREFIX + this.getId(); } - /** - * Obtain an exclusive lock on the {link #getEntityKey}. - * <p> - * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. - * - * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock - */ - public void acquireLock() throws InterruptedException { - // only get ZK lock when multiple servers running - if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) { - lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); - if (lock == null) { - LOG.debug("Could not aquire lock for [{0}]", getEntityKey()); - } - else { - LOG.debug("Acquired lock for [{0}]", getEntityKey()); - } - } - else { - lock = new DummyToken(); - } - } - - private static class DummyToken implements LockToken { - @Override - public void release() { - } - } - - public boolean isLocked() { - boolean locked = false; - if(lock != null) { - locked = true; - } - return locked; - } - - public void releaseLock(){ - if (lock != null) { - lock.release(); - lock = null; - LOG.debug("Released lock for [{0}]", getEntityKey()); - } - } - - public long getLockTimeOut() { - return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000); - } }
