[ 
https://issues.apache.org/jira/browse/HIVE-21884?focusedWorklogId=324732&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-324732
 ]

ASF GitHub Bot logged work on HIVE-21884:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Oct/19 23:46
            Start Date: 07/Oct/19 23:46
    Worklog Time Spent: 10m 
      Work Description: jcamachor commented on pull request #794: HIVE-21884
URL: https://github.com/apache/hive/pull/794#discussion_r332280126
 
 

 ##########
 File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 ##########
 @@ -12583,4 +12600,306 @@ public static boolean isCurrentStatsValidForTheQuery(
 
     return false;
   }
+
+  @Override
+  public ScheduledQueryPollResponse 
scheduledQueryPoll(ScheduledQueryPollRequest request) {
+    String namespace = request.getClusterNamespace();
+    boolean commited = false;
+    ScheduledQueryPollResponse ret = new ScheduledQueryPollResponse();
+    try {
+      openTransaction();
+      Query q = pm.newQuery(MScheduledQuery.class,
+          "nextExecution <= now && enabled && clusterNamespace == ns");
+      q.setSerializeRead(true);
+      q.declareParameters("java.lang.Integer now, java.lang.String ns");
+      q.setOrdering("nextExecution");
+      int now = (int) (System.currentTimeMillis() / 1000);
+      List<MScheduledQuery> results = (List<MScheduledQuery>) q.execute(now, 
request.getClusterNamespace());
+      if (results == null || results.isEmpty()) {
+        return new ScheduledQueryPollResponse();
+      }
+      MScheduledQuery schq = results.get(0);
+      Integer plannedExecutionTime = schq.getNextExecution();
+      schq.setNextExecution(computeNextExecutionTime(schq.getSchedule()));
+
+      MScheduledExecution execution = new MScheduledExecution();
+      execution.setScheduledQuery(schq);
+      execution.setState(QueryState.INITED);
+      execution.setStartTime(now);
+      execution.setLastUpdateTime(now);
+      pm.makePersistent(execution);
+      pm.makePersistent(schq);
+      ObjectStoreTestHook.onScheduledQueryPoll();
+      commited = commitTransaction();
+      ret.setScheduleKey(schq.getScheduleKey());
+      ret.setQuery(schq.getQuery());
+      ret.setUser(schq.getUser());
+      int executionId = ((IntIdentity) pm.getObjectId(execution)).getKey();
+      ret.setExecutionId(executionId);
+    } catch (JDOException e) {
+      LOG.debug("Caught jdo exception; exclusive", e);
+      commited = false;
+    } finally {
+      if (commited) {
+        return ret;
+      } else {
+        rollbackTransaction();
+        return new ScheduledQueryPollResponse();
+      }
+    }
+  }
+
+  @Override
+  public void scheduledQueryProgress(ScheduledQueryProgressInfo info) throws 
InvalidOperationException {
+    boolean commited = false;
+    try {
+      openTransaction();
+      MScheduledExecution execution = 
pm.getObjectById(MScheduledExecution.class, info.getScheduledExecutionId());
+      if (!validateStateChange(execution.getState(), info.getState())) {
+        throw new InvalidOperationException("Invalid state change: " + 
execution.getState() + "=>" + info.getState());
+      }
+      execution.setState(info.getState());
+      if (info.isSetExecutorQueryId()) {
+        execution.setExecutorQueryId(info.getExecutorQueryId());
+      }
+      if (info.isSetErrorMessage()) {
+        execution.setErrorMessage(info.getErrorMessage());
+      }
+
+      switch (info.getState()) {
+      case INITED:
+      case EXECUTING:
+        execution.setLastUpdateTime((int) (System.currentTimeMillis() / 1000));
+        break;
+      case ERRORED:
+      case FINISHED:
+      case TIMED_OUT:
+        execution.setEndTime((int) (System.currentTimeMillis() / 1000));
+        execution.setLastUpdateTime(null);
+      }
+      pm.makePersistent(execution);
+      commited = commitTransaction();
+    } finally {
+      if (commited) {
+      } else {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  private boolean validateStateChange(QueryState from, QueryState to) {
+    switch (from) {
+    case INITED:
+      return to != QueryState.INITED;
+    case EXECUTING:
+      return to == QueryState.FINISHED
+          || to == QueryState.EXECUTING
+          || to == QueryState.ERRORED;
+    default:
+      return false;
+    }
+  }
+
+  private Integer computeNextExecutionTime(String schedule) throws 
InvalidInputException {
+    CronType cronType = CronType.valueOf(MetastoreConf.getVar(this.conf, 
ConfVars.SCHEDULED_QUERIES_CRON_SYNTAX));
+
+    CronDefinition cronDefinition = 
CronDefinitionBuilder.instanceDefinitionFor(cronType);
+    CronParser parser = new CronParser(cronDefinition);
+
+    // Get date for last execution
+    try {
+      ZonedDateTime now = ZonedDateTime.now();
+      ExecutionTime executionTime = 
ExecutionTime.forCron(parser.parse(schedule));
+      Optional<ZonedDateTime> nextExecution = executionTime.nextExecution(now);
+      if (!nextExecution.isPresent()) {
+        // no valid next execution time.
+        return null;
+      }
+      return (int) nextExecution.get().toEpochSecond();
+    } catch (IllegalArgumentException iae) {
+      String message = "Invalid " + cronType + " schedule expression: '" + 
schedule + "'";
+      LOG.error(message, iae);
+      throw new InvalidInputException(message);
+    }
+  }
+
+  @Override
+  public void scheduledQueryMaintenance(ScheduledQueryMaintenanceRequest 
request)
+      throws MetaException, NoSuchObjectException, AlreadyExistsException, 
InvalidInputException {
+    switch (request.getType()) {
+    case INSERT:
+      scheduledQueryInsert(request.getScheduledQuery());
+      break;
+    case UPDATE:
+      scheduledQueryUpdate(request.getScheduledQuery());
+      break;
+    case DELETE:
+      scheduledQueryDelete(request.getScheduledQuery());
+      break;
+    default:
+      throw new MetaException("invalid request");
+    }
+  }
+
+  public void scheduledQueryInsert(ScheduledQuery scheduledQuery)
+      throws NoSuchObjectException, AlreadyExistsException, 
InvalidInputException {
+    MScheduledQuery schq = MScheduledQuery.fromThrift(scheduledQuery);
+    boolean commited = false;
+    try {
+      Optional<MScheduledQuery> existing = 
getMScheduledQuery(scheduledQuery.getScheduleKey());
+      if (existing.isPresent()) {
+        throw new AlreadyExistsException(
+            "Scheduled query with name: " + 
scheduledQueryKeyRef(scheduledQuery.getScheduleKey()) + " already exists.");
+      }
+      openTransaction();
+      Integer nextExecutionTime = computeNextExecutionTime(schq.getSchedule());
+      if (nextExecutionTime == null) {
+        throw new InvalidInputException("Invalid schedule: " + 
schq.getSchedule());
+      }
+      schq.setNextExecution(nextExecutionTime);
+      pm.makePersistent(schq);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  public void scheduledQueryDelete(ScheduledQuery scheduledQuery) throws 
NoSuchObjectException, AlreadyExistsException {
+    MScheduledQuery schq = MScheduledQuery.fromThrift(scheduledQuery);
+    boolean commited = false;
+    try {
+      openTransaction();
+      Optional<MScheduledQuery> existing = 
getMScheduledQuery(scheduledQuery.getScheduleKey());
+      if (!existing.isPresent()) {
+        throw new NoSuchObjectException(
+            "Scheduled query with name: " + 
scheduledQueryKeyRef(schq.getScheduleKey()) + " doesn't exists.");
+      }
+      MScheduledQuery persisted = existing.get();
+      pm.deletePersistent(persisted);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  public void scheduledQueryUpdate(ScheduledQuery scheduledQuery)
+      throws NoSuchObjectException, AlreadyExistsException, 
InvalidInputException {
+    MScheduledQuery schq = MScheduledQuery.fromThrift(scheduledQuery);
+    boolean commited = false;
+    try {
+      Optional<MScheduledQuery> existing = 
getMScheduledQuery(scheduledQuery.getScheduleKey());
+      if (!existing.isPresent()) {
+        throw new NoSuchObjectException(
+            "Scheduled query with name: " + 
scheduledQueryKeyRef(scheduledQuery.getScheduleKey()) + " doesn't exists.");
+      }
+      openTransaction();
+      MScheduledQuery persisted = existing.get();
+      persisted.doUpdate(schq);
+      Integer nextExecutionTime = computeNextExecutionTime(schq.getSchedule());
+      if (nextExecutionTime == null) {
+        throw new InvalidInputException("Invalid schedule: " + 
schq.getSchedule());
+      }
+      persisted.setNextExecution(nextExecutionTime);
+      pm.makePersistent(persisted);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  @Override
+  public ScheduledQuery getScheduledQuery(ScheduledQueryKey key) throws 
NoSuchObjectException {
+    Optional<MScheduledQuery> mScheduledQuery = getMScheduledQuery(key);
+    if (!mScheduledQuery.isPresent()) {
+      throw new NoSuchObjectException(
+          "There is no scheduled query for: " + scheduledQueryKeyRef(key));
+    }
+    return mScheduledQuery.get().toThrift();
+
+  }
+
+  private String scheduledQueryKeyRef(ScheduledQueryKey key) {
+    return key.getScheduleName() + "@" + key.getClusterNamespace();
+  }
+
+  public Optional<MScheduledQuery> getMScheduledQuery(ScheduledQueryKey key) {
+    MScheduledQuery s = null;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      query = pm.newQuery(MScheduledQuery.class, "scheduleName == sName && 
clusterNamespace == ns");
+      query.declareParameters("java.lang.String sName, java.lang.String ns");
+      query.setUnique(true);
+      s = (MScheduledQuery) query.execute(key.getScheduleName(), 
key.getClusterNamespace());
+      pm.retrieve(s);
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return Optional.ofNullable(s);
+  }
+
+  @Override
+  public int deleteScheduledExecutions(int maxRetainSecs) {
+    if (maxRetainSecs < 0) {
+      LOG.debug("scheduled executions retention is disabled");
+      return 0;
+    }
+    boolean committed = false;
+    try {
+      openTransaction();
+      int maxCreateTime = (int) (System.currentTimeMillis() / 1000) - 
maxRetainSecs;
+      Query q = pm.newQuery(MScheduledExecution.class);
+      q.setFilter("startTime <= maxCreateTime");
+      q.declareParameters("int maxCreateTime");
+      long deleted = q.deletePersistentAll(maxCreateTime);
+      committed = commitTransaction();
+      return (int) deleted;
+    } finally {
+      if (!committed) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  @Override
+  public int markScheduledExecutionsTimedOut(int timeoutSecs) throws 
InvalidOperationException {
+    if (timeoutSecs < 0) {
+      LOG.debug("scheduled executions - time_out mark is disabled");
+      return 0;
+    }
+    boolean committed = false;
+    try {
+      openTransaction();
+      int maxLastUpdateTime = (int) (System.currentTimeMillis() / 1000) - 
timeoutSecs;
+      Query q = pm.newQuery(MScheduledExecution.class);
+      q.setFilter("lastUpdateTime <= maxLastUpdateTime and (state == 'INITED' 
or state == 'EXECUTING')");
+      q.declareParameters("int maxLastUpdateTime");
+      
+      List<MScheduledExecution> results = (List<MScheduledExecution>) 
q.execute(maxLastUpdateTime);
+      for (MScheduledExecution e : results) {
+        
+        ScheduledQueryProgressInfo info = new ScheduledQueryProgressInfo();
+        info.setScheduledExecutionId(e.getScheduledExecutionId());
+        info.setState(QueryState.TIMED_OUT);
+        info.setErrorMessage(
+            "Query stuck in: " + e.getState() + " state for >" + timeoutSecs + 
" seconds. Execution timed out.");
+        //        info.set
 
 Review comment:
   ?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 324732)
    Time Spent: 2h 20m  (was: 2h 10m)

> Scheduled query support
> -----------------------
>
>                 Key: HIVE-21884
>                 URL: https://issues.apache.org/jira/browse/HIVE-21884
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Zoltan Haindrich
>            Assignee: Zoltan Haindrich
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-21844.04.patch, HIVE-21844.05.patch, 
> HIVE-21844.06.patch, HIVE-21844.07.patch, HIVE-21844.08.patch, 
> HIVE-21844.09.patch, HIVE-21844.15.patch, HIVE-21844.19.patch, 
> HIVE-21884.01.patch, HIVE-21884.02.patch, HIVE-21884.03.patch, 
> HIVE-21884.09.patch, HIVE-21884.10.patch, HIVE-21884.10.patch, 
> HIVE-21884.11.patch, HIVE-21884.12.patch, HIVE-21884.13.patch, 
> HIVE-21884.14.patch, HIVE-21884.14.patch, HIVE-21884.14.patch, 
> HIVE-21884.16.patch, HIVE-21884.17.patch, HIVE-21884.18.patch, 
> HIVE-21884.20.patch, Scheduled queries2.pdf
>
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> design document:
> https://docs.google.com/document/d/1mJSFdJi_1cbxJTXC9QvGw2rQ3zzJkNfxOO6b5esmyCE/edit#
> in case the google doc is not reachable:  [^Scheduled queries2.pdf] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to