LENS-128: SchedulerServiceImpl for scheduler.
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/1a96948e Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/1a96948e Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/1a96948e Branch: refs/heads/master Commit: 1a96948eeeb6433b7b2345cea086900004b631db Parents: acb32d5 Author: Lavkesh Lahngir <[email protected]> Authored: Mon Jul 25 17:35:55 2016 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Mon Jul 25 17:35:55 2016 +0530 ---------------------------------------------------------------------- .../error/InvalidStateTransitionException.java | 46 +++ .../lens/api/scheduler/SchedulerJobEvent.java | 31 ++ .../lens/api/scheduler/SchedulerJobInfo.java | 8 +- .../scheduler/SchedulerJobInstanceEvent.java | 33 ++ .../api/scheduler/SchedulerJobInstanceInfo.java | 44 +-- .../api/scheduler/SchedulerJobInstanceRun.java | 80 +++++ .../scheduler/SchedulerJobInstanceState.java | 157 ++++++++++ .../scheduler/SchedulerJobInstanceStatus.java | 23 -- .../lens/api/scheduler/SchedulerJobState.java | 109 +++++++ .../lens/api/scheduler/SchedulerJobStatus.java | 23 -- .../lens/api/scheduler/StateTransitioner.java | 34 ++ .../lens/cube/parse/TimerangeResolver.java | 14 +- .../FieldsCannotBeQueriedTogetherTest.java | 1 - .../lens/cube/parse/TestTimeRangeResolver.java | 19 ++ .../lens/server/api/LensConfConstants.java | 4 + .../error/InvalidStateTransitionException.java | 46 --- .../server/api/events/SchedulerAlarmEvent.java | 15 +- .../server/api/scheduler/SchedulerService.java | 65 ++-- .../lens/server/api/scheduler/StateMachine.java | 35 --- .../org/apache/lens/server/BaseLensService.java | 10 +- .../org/apache/lens/server/LensServices.java | 4 +- .../lens/server/metrics/MetricsServiceImpl.java | 10 +- .../lens/server/scheduler/AlarmService.java | 249 +++++++++++++++ .../lens/server/scheduler/ScheduleResource.java | 30 +- .../lens/server/scheduler/SchedulerDAO.java | 309 ++++++++++++++----- .../scheduler/SchedulerEventListener.java | 174 +++++++++++ .../scheduler/SchedulerQueryEventListener.java | 87 ++++++ .../server/scheduler/SchedulerServiceImpl.java | 260 +++++++++++----- .../notification/services/AlarmService.java | 220 ------------- .../state/SchedulerJobInstanceState.java | 193 ------------ .../scheduler/state/SchedulerJobState.java | 150 --------- .../server/scheduler/util/UtilityMethods.java | 52 ---- .../lens/server/session/LensSessionImpl.java | 2 +- .../apache/lens/server/util/UtilityMethods.java | 53 +++- .../src/main/resources/lensserver-default.xml | 16 +- .../lens/server/scheduler/AlarmServiceTest.java | 177 +++++++++++ .../lens/server/scheduler/SchedulerDAOTest.java | 80 ++--- .../scheduler/TestSchedulerServiceImpl.java | 236 ++++++++++++++ .../notification/services/AlarmServiceTest.java | 180 ----------- src/site/apt/admin/config.apt | 228 +++++++------- 40 files changed, 2147 insertions(+), 1360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/error/InvalidStateTransitionException.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/error/InvalidStateTransitionException.java b/lens-api/src/main/java/org/apache/lens/api/error/InvalidStateTransitionException.java new file mode 100644 index 0000000..a98bc30 --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/error/InvalidStateTransitionException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lens.api.error; + +/** + * Exception thrown during state transition of jobs and job instances. + */ +public class InvalidStateTransitionException extends Exception { + /** + * @param e Exception + */ + public InvalidStateTransitionException(Throwable e) { + super(e); + } + + /** + * @param message - custom exception message + * @param e + */ + public InvalidStateTransitionException(String message, Throwable e) { + super(message, e); + } + + /** + * @param message - custom exception message + */ + public InvalidStateTransitionException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobEvent.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobEvent.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobEvent.java new file mode 100644 index 0000000..c849151 --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobEvent.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.api.scheduler; + +/** + * All events(actions) which can happen on a Scheduler Job. + */ +public enum SchedulerJobEvent { + ON_SUBMIT, + ON_SCHEDULE, + ON_SUSPEND, + ON_RESUME, + ON_EXPIRE, + ON_DELETE +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java index 7d06689..b19248f 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java @@ -49,13 +49,13 @@ public class SchedulerJobInfo { private String userName; /** - * @param state state of this job. - * @return current state of this job + * @param jobstate of this job. + * @return current status of this job */ - private SchedulerJobStatus state; + private SchedulerJobState jobState; /** - * @param createdOn time to be set as createdOn. + * @param createdOn time to be set when it was created. * @return time when this job was submitted. */ private long createdOn; http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceEvent.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceEvent.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceEvent.java new file mode 100644 index 0000000..4f3409c --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceEvent.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.api.scheduler; + +/** + * All events(actions) which can happen on an instance of <code>SchedulerJob</code>. + */ +public enum SchedulerJobInstanceEvent { + ON_CREATION, // an instance is first considered by the scheduler. + ON_TIME_OUT, + ON_CONDITIONS_MET, + ON_RUN, + ON_SUCCESS, + ON_FAILURE, + ON_RERUN, + ON_KILL +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java index 8158576..52b56ca 100644 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java @@ -18,7 +18,7 @@ */ package org.apache.lens.api.scheduler; -import org.apache.lens.api.LensSessionHandle; +import java.util.List; import lombok.AllArgsConstructor; import lombok.Data; @@ -43,45 +43,15 @@ public class SchedulerJobInstanceInfo { private SchedulerJobHandle jobId; /** - * @param sessionHandle new session handle. - * @return session handle for this instance. + * @param scheduleTime time to be set as the nomial time for the instance. + * @return scheduled time of this instance. */ - private LensSessionHandle sessionHandle; + private long scheduleTime; /** - * @param startTime start time to be set for the instance. - * @return actual start time of this instance. + * @param instanceRunList: List of runs. + * @return A list of instance-run for this instance. */ - private long startTime; - - /** - * @param endTime end time to be set for the instance. - * @return actual finish time of this instance. - */ - private long endTime; - - /** - * @param resultPath result path to be set. - * @return result path of this instance. - */ - private String resultPath; - - /** - * @param query query to be set - * @return query of this instance. - */ - private String query; - - /** - * @param state state to be set. - * @return state of this instance. - */ - private SchedulerJobInstanceStatus state; - - /** - * @param createdOn time to be set as created_on time for the instance. - * @return created_on time of this instance. - */ - private long createdOn; + private List<SchedulerJobInstanceRun> instanceRunList; } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java new file mode 100644 index 0000000..e6c1571 --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.api.scheduler; + +import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.query.QueryHandle; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@AllArgsConstructor +@EqualsAndHashCode +public class SchedulerJobInstanceRun { + + /** + * @param handle : Instance handle + * @return the handle + */ + private SchedulerJobInstanceHandle handle; + + /** + * @param runId : run number of the instance run. Highest run number will represent the latest run. + * @return the runId + */ + private int runId; + + /** + * @param sessionHandle new session handle. + * @return session handle for this instance run. + */ + private LensSessionHandle sessionHandle; + /** + * @param startTime start time to be set for the instance run. + * @return actual start time of this instance run . + */ + private long startTime; + + /** + * @param endTime end time to be set for the instance run. + * @return actual finish time of this instance run. + */ + private long endTime; + + /** + * @param resultPath result path to be set. + * @return result path of this instance run. + */ + private String resultPath; + + /** + * @param queryHandle query to be set + * @return queryHandle of this instance run. + */ + private QueryHandle queryHandle; + + /** + * @param instanceState state to be set. + * @return status of this instance. + */ + private SchedulerJobInstanceState instanceState; + +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java new file mode 100644 index 0000000..93d3d7e --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.api.scheduler; + +import org.apache.lens.api.error.InvalidStateTransitionException; + +public enum SchedulerJobInstanceState + implements StateTransitioner<SchedulerJobInstanceState, SchedulerJobInstanceEvent> { + // repeating same operation will return the same state to ensure idempotent behavior. + WAITING { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_CREATION: + return this; + case ON_CONDITIONS_MET: + return SchedulerJobInstanceState.LAUNCHED; + case ON_TIME_OUT: + return SchedulerJobInstanceState.TIMED_OUT; + case ON_RUN: + return SchedulerJobInstanceState.RUNNING; + case ON_SUCCESS: + return SchedulerJobInstanceState.SUCCEEDED; + case ON_FAILURE: + return SchedulerJobInstanceState.FAILED; + case ON_KILL: + return SchedulerJobInstanceState.KILLED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + LAUNCHED { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_CONDITIONS_MET: + return this; + case ON_RUN: + return SchedulerJobInstanceState.RUNNING; + case ON_SUCCESS: + return SchedulerJobInstanceState.SUCCEEDED; + case ON_FAILURE: + return SchedulerJobInstanceState.FAILED; + case ON_KILL: + return SchedulerJobInstanceState.KILLED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + RUNNING { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_RUN: + return this; + case ON_SUCCESS: + return SchedulerJobInstanceState.SUCCEEDED; + case ON_FAILURE: + return SchedulerJobInstanceState.FAILED; + case ON_KILL: + return SchedulerJobInstanceState.KILLED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + FAILED { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_FAILURE: + return this; + case ON_RERUN: + return SchedulerJobInstanceState.LAUNCHED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + SUCCEEDED { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_SUCCESS: + return this; + case ON_RERUN: + return SchedulerJobInstanceState.LAUNCHED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + TIMED_OUT { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_TIME_OUT: + return this; + case ON_RERUN: + return SchedulerJobInstanceState.WAITING; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + KILLED { + @Override + public SchedulerJobInstanceState nextTransition(SchedulerJobInstanceEvent event) + throws InvalidStateTransitionException { + switch (event) { + case ON_KILL: + return this; + case ON_RERUN: + return SchedulerJobInstanceState.LAUNCHED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobInstanceEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceStatus.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceStatus.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceStatus.java deleted file mode 100644 index 85e7e85..0000000 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceStatus.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.api.scheduler; - -public enum SchedulerJobInstanceStatus { - WAITING, TIMED_OUT, LAUNCHED, RUNNING, FAILED, SUCCEEDED, KILLED, -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java new file mode 100644 index 0000000..ffaae6c --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.api.scheduler; + +import org.apache.lens.api.error.InvalidStateTransitionException; + +public enum SchedulerJobState implements StateTransitioner<SchedulerJobState, SchedulerJobEvent> { + // repeating same operation will return the same state to ensure idempotent behavior. + NEW { + @Override + public SchedulerJobState nextTransition(SchedulerJobEvent event) throws InvalidStateTransitionException { + switch (event) { + case ON_SUBMIT: + return this; + case ON_SCHEDULE: + return SchedulerJobState.SCHEDULED; + case ON_EXPIRE: + return SchedulerJobState.EXPIRED; + case ON_DELETE: + return SchedulerJobState.DELETED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + SCHEDULED { + @Override + public SchedulerJobState nextTransition(SchedulerJobEvent event) throws InvalidStateTransitionException { + switch (event) { + case ON_SCHEDULE: + return this; + case ON_SUSPEND: + return SchedulerJobState.SUSPENDED; + case ON_EXPIRE: + return SchedulerJobState.EXPIRED; + case ON_DELETE: + return SchedulerJobState.DELETED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + SUSPENDED { + @Override + public SchedulerJobState nextTransition(SchedulerJobEvent event) throws InvalidStateTransitionException { + switch (event) { + case ON_SUSPEND: + return this; + case ON_RESUME: + return SchedulerJobState.SCHEDULED; + case ON_EXPIRE: + return SchedulerJobState.EXPIRED; + case ON_DELETE: + return SchedulerJobState.DELETED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + EXPIRED { + @Override + public SchedulerJobState nextTransition(SchedulerJobEvent event) throws InvalidStateTransitionException { + switch (event) { + case ON_EXPIRE: + return this; + case ON_DELETE: + return SchedulerJobState.DELETED; + default: + throw new InvalidStateTransitionException( + "SchedulerJobEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + }, + + DELETED { + @Override + public SchedulerJobState nextTransition(SchedulerJobEvent event) throws InvalidStateTransitionException { + switch (event) { + case ON_DELETE: + return this; + default: + throw new InvalidStateTransitionException( + "SchedulerJobEvent: " + event.name() + " is not a valid event for state: " + this.name()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobStatus.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobStatus.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobStatus.java deleted file mode 100644 index aaf403c..0000000 --- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobStatus.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.api.scheduler; - -public enum SchedulerJobStatus { - NEW, SCHEDULED, SUSPENDED, EXPIRED, DELETED -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-api/src/main/java/org/apache/lens/api/scheduler/StateTransitioner.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/StateTransitioner.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/StateTransitioner.java new file mode 100644 index 0000000..d2742ef --- /dev/null +++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/StateTransitioner.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lens.api.scheduler; + +import org.apache.lens.api.error.InvalidStateTransitionException; + +/** + * Interface to be implemented by a class that handles state transitions. + */ +public interface StateTransitioner<STATE extends Enum<STATE> + & StateTransitioner<STATE, EVENT>, EVENT extends Enum<EVENT>> { + + /** + * @param event + * @return The state that the machine enters into as a result of the event. + * @throws InvalidStateTransitionException + */ + STATE nextTransition(EVENT event) throws InvalidStateTransitionException; +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java index 33ec9d9..06ba148 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java @@ -18,8 +18,6 @@ */ package org.apache.lens.cube.parse; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.FACT_NOT_AVAILABLE_IN_RANGE; - import static org.apache.hadoop.hive.ql.parse.HiveParser.*; import java.util.*; @@ -31,6 +29,7 @@ import org.apache.lens.cube.metadata.*; import org.apache.lens.cube.metadata.join.JoinPath; import org.apache.lens.cube.parse.DenormalizationResolver.ReferencedQueriedColumn; import org.apache.lens.cube.parse.join.AutoJoinContext; +import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; import org.apache.commons.lang.StringUtils; @@ -127,8 +126,13 @@ class TimerangeResolver implements ContextRewriter { toDateRaw = PlanUtils.stripQuotes(timenode.getChild(3).getText()); } } - - Date now = new Date(); + long currentTime = cubeql.getConf().getLong(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, 0); + Date now; + if (currentTime != 0) { + now = new Date(currentTime); + } else { + now = new Date(); + } builder.fromDate(DateUtil.resolveDate(fromDateRaw, now)); if (StringUtils.isNotBlank(toDateRaw)) { builder.toDate(DateUtil.resolveDate(toDateRaw, now)); @@ -256,6 +260,6 @@ class TimerangeResolver implements ContextRewriter { iter.remove(); } } - cubeql.pruneCandidateFactSet(FACT_NOT_AVAILABLE_IN_RANGE); + cubeql.pruneCandidateFactSet(CandidateTablePruneCause.CandidateTablePruneCode.FACT_NOT_AVAILABLE_IN_RANGE); } } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-cube/src/test/java/org/apache/lens/cube/parse/FieldsCannotBeQueriedTogetherTest.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/test/java/org/apache/lens/cube/parse/FieldsCannotBeQueriedTogetherTest.java b/lens-cube/src/test/java/org/apache/lens/cube/parse/FieldsCannotBeQueriedTogetherTest.java index fe2dfb3..7afa32e 100644 --- a/lens-cube/src/test/java/org/apache/lens/cube/parse/FieldsCannotBeQueriedTogetherTest.java +++ b/lens-cube/src/test/java/org/apache/lens/cube/parse/FieldsCannotBeQueriedTogetherTest.java @@ -47,7 +47,6 @@ public class FieldsCannotBeQueriedTogetherTest extends TestQueryRewrite { conf.setBoolean(CubeQueryConfUtil.ENABLE_SELECT_TO_GROUPBY, true); conf.setBoolean(CubeQueryConfUtil.DISABLE_AGGREGATE_RESOLVER, false); conf.setBoolean(CubeQueryConfUtil.DISABLE_AUTO_JOINS, false); - } @Test http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-cube/src/test/java/org/apache/lens/cube/parse/TestTimeRangeResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/test/java/org/apache/lens/cube/parse/TestTimeRangeResolver.java b/lens-cube/src/test/java/org/apache/lens/cube/parse/TestTimeRangeResolver.java index c8a9ac6..280a8c4 100644 --- a/lens-cube/src/test/java/org/apache/lens/cube/parse/TestTimeRangeResolver.java +++ b/lens-cube/src/test/java/org/apache/lens/cube/parse/TestTimeRangeResolver.java @@ -26,16 +26,21 @@ import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTable import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.util.Calendar; +import java.util.GregorianCalendar; import java.util.List; import java.util.Set; import org.apache.lens.cube.error.NoCandidateFactAvailableException; +import org.apache.lens.cube.metadata.TimeRange; +import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.ParseException; +import org.joda.time.DateTime; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -96,4 +101,18 @@ public class TestTimeRangeResolver extends TestQueryRewrite { assertEquals(pruningMsg.getCause(), FACT_NOT_AVAILABLE_IN_RANGE); assertTrue(pruningMsg.getInvalidRanges().containsAll(ctx.getTimeRanges())); } + + @Test + public void testCustomNow() throws Exception { + Configuration conf = getConf(); + DateTime dt = new DateTime(1990, 3, 23, 12, 0, 0, 0); + conf.setLong(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, dt.getMillis()); + CubeQueryContext ctx = rewriteCtx("select msr12 from basecube where time_range_in(d_time, 'now.day-275days','now')", + conf); + TimeRange timeRange = ctx.getTimeRanges().get(0); + // Month starts from zero. + Calendar from = new GregorianCalendar(1989, 5, 21, 0, 0, 0); + assertEquals(timeRange.getFromDate(), from.getTime()); + assertEquals(timeRange.getToDate(), dt.toDate()); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java index 43bd7e4..72e2b61 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java @@ -1094,4 +1094,8 @@ public final class LensConfConstants { */ public static final java.lang.String SCHEDULER_STORE_CLASS = SERVER_PFX + "scheduler.store.class"; + /** + * Query current time for the scheduled query. + */ + public static final String QUERY_CURRENT_TIME_IN_MILLIS = QUERY_PFX + "current.time.millis"; } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server-api/src/main/java/org/apache/lens/server/api/error/InvalidStateTransitionException.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/error/InvalidStateTransitionException.java b/lens-server-api/src/main/java/org/apache/lens/server/api/error/InvalidStateTransitionException.java deleted file mode 100644 index 5882151..0000000 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/error/InvalidStateTransitionException.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.lens.server.api.error; - -/** - * Exception thrown during state transition of jobs and job instances. - */ -public class InvalidStateTransitionException extends LensException { - /** - * @param e Exception - */ - public InvalidStateTransitionException(Throwable e) { - super(e); - } - - /** - * @param message - custom exception message - * @param e - */ - public InvalidStateTransitionException(String message, Throwable e) { - super(message, e); - } - - /** - * @param message - custom exception message - */ - public InvalidStateTransitionException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java index 3ca7eb9..0f2dabe 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java @@ -19,6 +19,7 @@ package org.apache.lens.server.api.events; import org.apache.lens.api.scheduler.SchedulerJobHandle; +import org.apache.lens.api.scheduler.SchedulerJobInstanceHandle; import org.joda.time.DateTime; @@ -34,13 +35,17 @@ public class SchedulerAlarmEvent extends LensEvent { * jobHandle for which the alarm needs to be triggered. */ private SchedulerJobHandle jobHandle; - private DateTime nominalTime; + private EventType type; + private SchedulerJobInstanceHandle previousInstance; - public SchedulerAlarmEvent(SchedulerJobHandle jobHandle, DateTime nominalTime) { + public SchedulerAlarmEvent(SchedulerJobHandle jobHandle, DateTime nominalTime, EventType type, + SchedulerJobInstanceHandle previousInstance) { super(nominalTime.getMillis()); this.jobHandle = jobHandle; this.nominalTime = nominalTime; + this.type = type; + this.previousInstance = previousInstance; } @Override @@ -48,4 +53,10 @@ public class SchedulerAlarmEvent extends LensEvent { return jobHandle.getHandleIdString(); } + /** + * Event type to know what kind of operations we want. + */ + public static enum EventType { + SCHEDULE, EXPIRE + } } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java index d0af876..c7f73eb 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,13 +27,14 @@ import org.apache.lens.server.api.LensService; import org.apache.lens.server.api.SessionValidator; import org.apache.lens.server.api.error.LensException; - /** * Scheduler interface. */ public interface SchedulerService extends LensService, SessionValidator { - /** The constant NAME */ + /** + * The constant name for scheduler service. + */ String NAME = "scheduler"; /** @@ -51,10 +52,10 @@ public interface SchedulerService extends LensService, SessionValidator { * * @param sessionHandle handle for the current session. * @param jobHandle handle for the job to be scheduled. + * @return true if there is a successful schedule * @throws LensException the lens exception */ - void scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; - + boolean scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; /** * Submit a job and also schedule it. @@ -76,7 +77,6 @@ public interface SchedulerService extends LensService, SessionValidator { */ XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; - /** * Returns the details of a job. Details may contain extra system information like id for the job. * @@ -85,12 +85,11 @@ public interface SchedulerService extends LensService, SessionValidator { * @return job details for the job * @throws LensException the lens exception */ - SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, - SchedulerJobHandle jobHandle) throws LensException; + SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; /** * Update a job with new definition. - * + * <p> * Updates will be applied only for newer instances. Running instances will be running with old definition * * @param sessionHandle @@ -99,23 +98,22 @@ public interface SchedulerService extends LensService, SessionValidator { * @return true or false based on whether the update was successful or failed. * @throws LensException the lens exception */ - boolean updateJob(LensSessionHandle sessionHandle, - SchedulerJobHandle jobHandle, XJob newJobDefinition) throws LensException; - + boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, XJob newJobDefinition) + throws LensException; /** * End a job by specifying an expiry time. * * @param sessionHandle handle for the current session. * @param jobHandle handle for the job + * @return true if the operation is successful. * @throws LensException the lens exception */ - void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; - + boolean expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; /** * Suspend a job. - * + * <p> * If the job is not in scheduled state, it will return true. * Once a job is suspended, no further instances of that job will run. * Any running instances of that job will continue normally. @@ -127,7 +125,6 @@ public interface SchedulerService extends LensService, SessionValidator { */ boolean suspendJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; - /** * Resume a job from a given time. * @@ -148,20 +145,21 @@ public interface SchedulerService extends LensService, SessionValidator { */ boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException; - /** * @param sessionHandle handle for the current session. * @param state filter for status, if specified only jobs in that state will be returned, * if null no entries will be removed from result * @param user filter for user who submitted the job, if specified only jobs submitted by the given user * will be returned, if not specified no entries will be removed from result on basis of userName + * @param jobName filter for jobName, if specified only the jobs with name same as given name will be considered + * , else no jobs will be filtered out on the basis of name. * @param startTime if specified only instances with scheduleTime after this time will be considered. * @param endTime if specified only instances with scheduleTime before this time will be considered. * @return A collection of stats per job * @throws LensException */ - Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, - String state, String user, long startTime, long endTime) throws LensException; + Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user, + String jobName, long startTime, long endTime) throws LensException; /** * Returns stats for a job. @@ -174,9 +172,8 @@ public interface SchedulerService extends LensService, SessionValidator { * @param endTime if specified only instances with scheduleTime before this time will be considered. * @throws LensException the lens exception */ - SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, - String state, long startTime, long endTime) throws LensException; - + SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state, + long startTime, long endTime) throws LensException; /** * Returns handles for last <code>numResults</code> instances for the job. @@ -184,15 +181,15 @@ public interface SchedulerService extends LensService, SessionValidator { * @param sessionHandle handle for the session. * @param jobHandle handle for the job * @param numResults - number of results to be returned, default 100. - * @return list of instances for the job + * @return list of instance ids for the job * @throws LensException the lens exception */ - List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, - SchedulerJobHandle jobHandle, Long numResults) throws LensException; + List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, + Long numResults) throws LensException; /** * Kills a running job instance. - * + * <p> * If the job instance is already completed or not in running state, this will be a no-op and will return false. * * @param sessionHandle handle for the session. @@ -200,12 +197,11 @@ public interface SchedulerService extends LensService, SessionValidator { * @return true if the instance was killed successfully, false otherwise. * @throws LensException the lens exception */ - boolean killInstance(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException; + boolean killInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) throws LensException; /** * Reruns a failed/killed/completed job instance. - * + * <p> * If the instance is not in a terminal state, then this operation will be a no-op and will return false. * * @param sessionHandle handle for the session. @@ -213,8 +209,8 @@ public interface SchedulerService extends LensService, SessionValidator { * @return true if the instance was re run successfully, false otherwise. * @throws LensException the lens exception */ - boolean rerunInstance(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException; + boolean rerunInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) + throws LensException; /** * Instance details for an instance. @@ -225,7 +221,6 @@ public interface SchedulerService extends LensService, SessionValidator { * @throws LensException the lens exception */ SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException; - + SchedulerJobInstanceHandle instanceHandle) throws LensException; } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/StateMachine.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/StateMachine.java b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/StateMachine.java deleted file mode 100644 index ba6a0a2..0000000 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/StateMachine.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.lens.server.api.scheduler; - - -import org.apache.lens.server.api.error.InvalidStateTransitionException; - -/** - * Interface to be implemented by a class that handles state transitions. - */ -public interface StateMachine<STATE extends Enum<STATE>, EVENT extends Enum<EVENT>> { - - /** - * @param event - * @return The state that the machine enters into as a result of the event. - * @throws InvalidStateTransitionException - */ - STATE nextTransition(EVENT event) throws InvalidStateTransitionException; - -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java index 74bc0be..d706306 100644 --- a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java +++ b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java @@ -146,6 +146,7 @@ public abstract class BaseLensService extends CompositeService implements Extern return numSessions != null && numSessions >= maxNumSessionsPerUser; } + /** * Open session. * @@ -157,12 +158,19 @@ public abstract class BaseLensService extends CompositeService implements Extern */ public LensSessionHandle openSession(String username, String password, Map<String, String> configuration) throws LensException { + return openSession(username, password, configuration, true); + } + + public LensSessionHandle openSession(String username, String password, Map<String, String> configuration, + boolean auth) throws LensException { if (StringUtils.isBlank(username)) { throw new BadRequestException("User name cannot be null or empty"); } SessionHandle sessionHandle; username = UtilityMethods.removeDomain(username); - doPasswdAuth(username, password); + if (auth) { + doPasswdAuth(username, password); + } SessionUser sessionUser = SESSION_USER_INSTANCE_MAP.get(username); if (sessionUser == null) { sessionUser = new SessionUser(username); http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/LensServices.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/LensServices.java b/lens-server/src/main/java/org/apache/lens/server/LensServices.java index 56094e0..a709f16 100644 --- a/lens-server/src/main/java/org/apache/lens/server/LensServices.java +++ b/lens-server/src/main/java/org/apache/lens/server/LensServices.java @@ -37,7 +37,7 @@ import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.metrics.MetricsServiceImpl; import org.apache.lens.server.model.LogSegregationContext; import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; -import org.apache.lens.server.scheduler.notification.services.AlarmService; +import org.apache.lens.server.scheduler.AlarmService; import org.apache.lens.server.session.LensSessionImpl; import org.apache.lens.server.stats.StatisticsService; import org.apache.lens.server.user.UserConfigLoaderFactory; @@ -179,7 +179,7 @@ public class LensServices extends CompositeService implements ServiceProvider { } // This is only for test, to simulate a restart of the server - static void setInstance(LensServices newInstance) { + public static void setInstance(LensServices newInstance) { instance = newInstance; } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java index f1ee169..e09180f 100644 --- a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java @@ -40,16 +40,12 @@ import org.apache.lens.server.api.metastore.CubeMetastoreService; import org.apache.lens.server.api.metrics.*; import org.apache.lens.server.api.query.QueryExecutionService; import org.apache.lens.server.api.query.StatusChange; -import org.apache.lens.server.api.session.SessionClosed; -import org.apache.lens.server.api.session.SessionEvent; -import org.apache.lens.server.api.session.SessionExpired; -import org.apache.lens.server.api.session.SessionOpened; -import org.apache.lens.server.api.session.SessionService; +import org.apache.lens.server.api.session.*; import org.apache.lens.server.healthcheck.LensServiceHealthCheck; import org.apache.lens.server.query.QueryExecutionServiceImpl; import org.apache.lens.server.quota.QuotaServiceImpl; +import org.apache.lens.server.scheduler.AlarmService; import org.apache.lens.server.scheduler.SchedulerServiceImpl; -import org.apache.lens.server.scheduler.notification.services.AlarmService; import org.apache.lens.server.session.DatabaseResourceService; import org.apache.lens.server.session.HiveSessionService; @@ -67,11 +63,9 @@ import com.codahale.metrics.health.HealthCheckRegistry; import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.codahale.metrics.jvm.ThreadStatesGaugeSet; - import info.ganglia.gmetric4j.gmetric.GMetric; import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode; import lombok.Getter; - import lombok.extern.slf4j.Slf4j; /** http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java new file mode 100644 index 0000000..217879f --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lens.server.scheduler; + +import org.apache.lens.api.scheduler.SchedulerJobHandle; +import org.apache.lens.api.scheduler.XFrequency; +import org.apache.lens.api.scheduler.XFrequencyEnum; +import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.LensService; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.events.LensEventService; +import org.apache.lens.server.api.events.SchedulerAlarmEvent; +import org.apache.lens.server.api.health.HealthStatus; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.AbstractService; + +import org.joda.time.DateTime; +import org.quartz.*; +import org.quartz.impl.StdSchedulerFactory; + +import lombok.extern.slf4j.Slf4j; + +/** + * This service is used primarily by Scheduler to get alarm notifications for scheduled queries. + * <p> + * As a schedule this service accepts start time, frequency, end time and timeZone. It also requires the + * {@link SchedulerJobHandle} which it sends as part of the + * {@link org.apache.lens.server.api.events.SchedulerAlarmEvent} to inform the scheduler about the job for which + * job the notification has been generated. + */ +@Slf4j +public class AlarmService extends AbstractService implements LensService { + + public static final String NAME = "alarm-service"; + + public static final String LENS_JOBS = "LensJobs"; + public static final String ALARM_SERVICE = "AlarmService"; + + private Scheduler scheduler; + + /** + * True if the service started properly and is running fine, false otherwise. + */ + private boolean isHealthy = true; + + /** + * Contains the reason if service is not healthy. + */ + private String healthCause; + + /** + * Creates a new instance of AlarmService. + * + * @param name the name + */ + public AlarmService(String name) { + super(name); + } + + @Override + public HealthStatus getHealthStatus() { + return isHealthy + ? new HealthStatus(isHealthy, "Alarm service is healthy.") + : new HealthStatus(isHealthy, healthCause); + } + + public synchronized void init(HiveConf hiveConf) { + super.init(hiveConf); + try { + this.scheduler = StdSchedulerFactory.getDefaultScheduler(); + } catch (SchedulerException e) { + isHealthy = false; + healthCause = "Failed to initialize the Quartz Scheduler for AlarmService."; + log.error(healthCause, e); + throw new IllegalStateException("Could not initialize the Alarm Service", e); + } + } + + @Override + public synchronized void start() { + try { + scheduler.start(); + log.info("Alarm service started successfully!"); + } catch (SchedulerException e) { + isHealthy = false; + healthCause = "Failed to start the Quartz Scheduler for AlarmService."; + log.error(healthCause, e); + throw new IllegalStateException("Could not start the Alarm service", e); + } + } + + @Override + public synchronized void stop() { + try { + scheduler.standby(); + log.info("Alarm Service stopped successfully."); + } catch (SchedulerException e) { + log.error("Failed to shut down the Quartz Scheduler for AlarmService.", e); + } + } + + /** + * This method can be used by any consumer who wants to receive notifications during a time range at a given + * frequency. + * <p> + * This method is intended to be used by LensScheduler to subscribe for time based notifications to schedule queries. + * On receiving a job to be scheduled LensScheduler will subscribe to all triggers required for the job, including + * AlarmService for time based triggers. + * + * @param start start time for notifications + * @param end end time for notifications + * @param frequency Frequency to determine the frequency at which notification should be sent. + * @param jobHandle Must be a unique jobHanlde across all consumers + */ + public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException { + // accept the schedule and then keep on sending the notifications for that schedule + JobDataMap map = new JobDataMap(); + map.put("jobHandle", jobHandle); + + JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, LENS_JOBS).usingJobData(map).build(); + + Trigger trigger; + if (frequency.getEnum() != null) { //for enum expression: create a trigger using calendar interval + CalendarIntervalScheduleBuilder scheduleBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule() + .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum())) + .withMisfireHandlingInstructionIgnoreMisfires(); + trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate()) + .endAt(end.toDate()).withSchedule(scheduleBuilder).build(); + } else { // for cron expression create a cron trigger + trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate()) + .endAt(end.toDate()).withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())).build(); + } + + // Tell quartz to run the job using our trigger + try { + scheduler.scheduleJob(job, trigger); + } catch (SchedulerException e) { + log.error("Error scheduling job with jobHandle: {}", jobHandle); + throw new LensException("Failed to schedule job with jobHandle: " + jobHandle, e); + } + } + + private int getTimeInterval(XFrequencyEnum frequencyEnum) { + // since quarterly is not supported natively, we express it as 3 months + return frequencyEnum == XFrequencyEnum.QUARTERLY ? 3 : 1; + } + + // Maps the timeunit in entity specification to the one in Quartz DateBuilder + private DateBuilder.IntervalUnit getTimeUnit(XFrequencyEnum frequencyEnum) { + switch (frequencyEnum) { + + case DAILY: + return DateBuilder.IntervalUnit.DAY; + + case WEEKLY: + return DateBuilder.IntervalUnit.WEEK; + + case MONTHLY: + return DateBuilder.IntervalUnit.MONTH; + + case QUARTERLY: + return DateBuilder.IntervalUnit.MONTH; + + case YEARLY: + return DateBuilder.IntervalUnit.YEAR; + + default: + throw new IllegalArgumentException("Invalid frequency enum expression: " + frequencyEnum.name()); + } + } + + public boolean unSchedule(SchedulerJobHandle jobHandle) throws LensException { + // stop sending notifications for this job handle + try { + return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS)); + } catch (SchedulerException e) { + log.error("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); + throw new LensException("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); + } + } + + public boolean checkExists(SchedulerJobHandle handle) throws LensException { + try { + return scheduler.checkExists(JobKey.jobKey(handle.getHandleIdString(), LENS_JOBS)); + } catch (SchedulerException e) { + log.error("Failed to check the job with jobHandle: " + handle, e); + throw new LensException("Failed to check the job with jobHandle: " + handle, e); + } + } + + public void pauseJob(SchedulerJobHandle jobHandle) throws LensException { + try { + scheduler.pauseJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS)); + } catch (SchedulerException e) { + log.error("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e); + throw new LensException("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e); + } + } + + public void resumeJob(SchedulerJobHandle jobHandle) throws LensException { + try { + scheduler.resumeJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS)); + } catch (SchedulerException e) { + log.error("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e); + throw new LensException("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e); + } + } + + public static class LensJob implements Job { + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + JobDataMap data = jobExecutionContext.getMergedJobDataMap(); + DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime()); + SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle")); + SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime, + SchedulerAlarmEvent.EventType.SCHEDULE, null); + try { + LensEventService eventService = LensServices.get().getService(LensEventService.NAME); + eventService.notifyEvent(alarmEvent); + if (jobExecutionContext.getNextFireTime() == null) { + eventService + .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null)); + } + } catch (LensException e) { + log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}", + jobHandle.getHandleIdString(), nominalTime.toString(), e); + throw new JobExecutionException("Failed to notify alarmEvent", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java index 39c4d98..8603edf 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java @@ -40,7 +40,7 @@ import org.apache.commons.lang3.StringUtils; @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public class ScheduleResource { - public static enum INSTANCE_ACTIONS { + public enum INSTANCE_ACTIONS { KILL, RERUN; public static INSTANCE_ACTIONS fromString(String name) { @@ -48,7 +48,7 @@ public class ScheduleResource { } } - public static enum JOB_ACTIONS { + public enum JOB_ACTIONS { SCHEDULE, EXPIRE, SUSPEND, RESUME; public static JOB_ACTIONS fromString(String name) { @@ -89,12 +89,12 @@ public class ScheduleResource { @GET @Path("jobs/stats") public Collection<SchedulerJobStats> getAllJobStats(@QueryParam("sessionid") LensSessionHandle sessionId, - @DefaultValue("running") @QueryParam("state") String state, + @DefaultValue("running") @QueryParam("status") String status, @QueryParam("name") String jobName, @DefaultValue("user") @QueryParam("user") String user, @DefaultValue("-1") @QueryParam("start") long start, @DefaultValue("-1") @QueryParam("end") long end) throws LensException { - return getSchedulerService().getAllJobStats(sessionId, state, user, start, end); + return getSchedulerService().getAllJobStats(sessionId, status, user, jobName, start, end); } @GET @@ -187,20 +187,30 @@ public class ScheduleResource { @QueryParam("action") INSTANCE_ACTIONS action) throws LensException { validateSession(sessionId); + APIResult res; switch (action) { case KILL: - getSchedulerService().killInstance(sessionId, instanceHandle); + if (getSchedulerService().killInstance(sessionId, instanceHandle)) { + res = new APIResult(APIResult.Status.SUCCEEDED, + "Killing the instance with id " + instanceHandle + " was successful"); + } else { + res = new APIResult(APIResult.Status.FAILED, + "Killing the instance with id " + instanceHandle + " was not successful"); + } break; - case RERUN: - getSchedulerService().rerunInstance(sessionId, instanceHandle); + if (getSchedulerService().rerunInstance(sessionId, instanceHandle)) { + res = new APIResult(APIResult.Status.SUCCEEDED, + "Rerunning the instance with id " + instanceHandle + " was successful"); + } else { + res = new APIResult(APIResult.Status.FAILED, + "Rerunning the instance with id " + instanceHandle + " was not successful"); + } break; - default: throw new BadRequestException("Unsupported action " + action.toString()); } - return APIResult.success(); + return res; } - }
