jcamachor commented on a change in pull request #794: HIVE-21884
URL: https://github.com/apache/hive/pull/794#discussion_r332279975
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
##########
@@ -12583,4 +12600,306 @@ public static boolean isCurrentStatsValidForTheQuery(
return false;
}
+
+ @Override
+ public ScheduledQueryPollResponse
scheduledQueryPoll(ScheduledQueryPollRequest request) {
+ String namespace = request.getClusterNamespace();
+ boolean commited = false;
+ ScheduledQueryPollResponse ret = new ScheduledQueryPollResponse();
+ try {
+ openTransaction();
+ Query q = pm.newQuery(MScheduledQuery.class,
+ "nextExecution <= now && enabled && clusterNamespace == ns");
+ q.setSerializeRead(true);
+ q.declareParameters("java.lang.Integer now, java.lang.String ns");
+ q.setOrdering("nextExecution");
+ int now = (int) (System.currentTimeMillis() / 1000);
+ List<MScheduledQuery> results = (List<MScheduledQuery>) q.execute(now,
request.getClusterNamespace());
+ if (results == null || results.isEmpty()) {
+ return new ScheduledQueryPollResponse();
+ }
+ MScheduledQuery schq = results.get(0);
+ Integer plannedExecutionTime = schq.getNextExecution();
+ schq.setNextExecution(computeNextExecutionTime(schq.getSchedule()));
+
+ MScheduledExecution execution = new MScheduledExecution();
+ execution.setScheduledQuery(schq);
+ execution.setState(QueryState.INITED);
+ execution.setStartTime(now);
+ execution.setLastUpdateTime(now);
+ pm.makePersistent(execution);
+ pm.makePersistent(schq);
+ ObjectStoreTestHook.onScheduledQueryPoll();
+ commited = commitTransaction();
+ ret.setScheduleKey(schq.getScheduleKey());
+ ret.setQuery(schq.getQuery());
+ ret.setUser(schq.getUser());
+ int executionId = ((IntIdentity) pm.getObjectId(execution)).getKey();
+ ret.setExecutionId(executionId);
+ } catch (JDOException e) {
+ LOG.debug("Caught jdo exception; exclusive", e);
+ commited = false;
+ } finally {
+ if (commited) {
+ return ret;
+ } else {
+ rollbackTransaction();
+ return new ScheduledQueryPollResponse();
+ }
+ }
+ }
+
+ @Override
+ public void scheduledQueryProgress(ScheduledQueryProgressInfo info) throws
InvalidOperationException {
+ boolean commited = false;
+ try {
+ openTransaction();
+ MScheduledExecution execution =
pm.getObjectById(MScheduledExecution.class, info.getScheduledExecutionId());
+ if (!validateStateChange(execution.getState(), info.getState())) {
+ throw new InvalidOperationException("Invalid state change: " +
execution.getState() + "=>" + info.getState());
Review comment:
Cool!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]