This is an automated email from the ASF dual-hosted git repository.

bhliva pushed a commit to branch inactivity_integration
in repository https://gitbox.apache.org/repos/asf/incubator-dlab.git


The following commit(s) were added to refs/heads/inactivity_integration by this 
push:
     new f8460f3  DLAB-406 fixed bug with inactivity scheduler
f8460f3 is described below

commit f8460f38b062caff27accd3e7e05e8e012fe0872
Author: bhliva <[email protected]>
AuthorDate: Mon Mar 4 17:39:57 2019 +0200

    DLAB-406 fixed bug with inactivity scheduler
---
 .../service/impl/SchedulerJobServiceImpl.java      | 780 ++++++++++-----------
 1 file changed, 390 insertions(+), 390 deletions(-)

diff --git 
a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
 
b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
index 63e99e4..7be29e5 100644
--- 
a/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
+++ 
b/services/self-service/src/main/java/com/epam/dlab/backendapi/service/impl/SchedulerJobServiceImpl.java
@@ -57,396 +57,396 @@ import static java.util.Date.from;
 @Singleton
 public class SchedulerJobServiceImpl implements SchedulerJobService {
 
-       private static final String SCHEDULER_NOT_FOUND_MSG =
-                       "Scheduler job data not found for user %s with 
exploratory %s";
-       private static final long ALLOWED_INACTIVITY_MINUTES = 1L;
-
-       @Inject
-       private SchedulerJobDAO schedulerJobDAO;
-
-       @Inject
-       private ExploratoryDAO exploratoryDAO;
-
-       @Inject
-       private ComputationalDAO computationalDAO;
-
-       @Inject
-       private ExploratoryService exploratoryService;
-
-       @Inject
-       private ComputationalService computationalService;
-
-       @Inject
-       private SystemUserInfoService systemUserService;
-
-       @Override
-       public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String 
user, String exploratoryName) {
-               return 
schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, 
exploratoryName)
-                               .orElseThrow(() -> new 
ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
-                                               exploratoryName)));
-       }
-
-       @Override
-       public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String 
user, String exploratoryName,
-                                                                               
                                                         String 
computationalName) {
-               return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, 
exploratoryName, computationalName)
-                               .orElseThrow(() -> new 
ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
-                                               exploratoryName) + " with 
computational resource " + computationalName));
-       }
-
-       @Override
-       public void updateExploratorySchedulerData(String user, String 
exploratoryName, SchedulerJobDTO dto) {
-               validateExploratoryStatus(user, exploratoryName);
-               populateDefaultSchedulerValues(dto);
-               log.debug("Updating exploratory {} for user {} with new 
scheduler job data: {}...", exploratoryName, user,
-                               dto);
-               exploratoryDAO.updateSchedulerDataForUserAndExploratory(user, 
exploratoryName, dto);
-               if (dto.isSyncStartRequired()) {
-                       shareSchedulerJobDataToSparkClusters(user, 
exploratoryName, dto);
-               } else {
-                       computationalDAO.updateSchedulerSyncFlag(user, 
exploratoryName, dto.isSyncStartRequired());
-               }
-       }
-
-       @Override
-       public void updateComputationalSchedulerData(String user, String 
exploratoryName, String computationalName,
-                                                                               
                 SchedulerJobDTO dto) {
-               validateExploratoryStatus(user, exploratoryName);
-               validateComputationalStatus(user, exploratoryName, 
computationalName);
-               populateDefaultSchedulerValues(dto);
-               log.debug("Updating computational resource {} affiliated with 
exploratory {} for user {} with new scheduler " +
-                               "job data {}...", computationalName, 
exploratoryName, user, dto);
-               
computationalDAO.updateSchedulerDataForComputationalResource(user, 
exploratoryName, computationalName, dto);
-       }
-
-       @Override
-       public void stopComputationalByScheduler() {
-               getComputationalSchedulersForStopping(OffsetDateTime.now(), 
true)
-                               .forEach(this::stopComputational);
-       }
-
-       @Override
-       public void stopExploratoryByScheduler() {
-               getExploratorySchedulersForStopping(OffsetDateTime.now(), true)
-                               .forEach(this::stopExploratory);
-       }
-
-       @Override
-       public void startExploratoryByScheduler() {
-               getExploratorySchedulersForStarting(OffsetDateTime.now())
-                               .forEach(this::startExploratory);
-       }
-
-       @Override
-       public void startComputationalByScheduler() {
-               getComputationalSchedulersForStarting(OffsetDateTime.now())
-                               .forEach(job -> startSpark(job.getUser(), 
job.getExploratoryName(), job.getComputationalName()));
-       }
-
-       @Override
-       public void terminateExploratoryByScheduler() {
-               getExploratorySchedulersForTerminating(OffsetDateTime.now())
-                               .forEach(this::terminateExploratory);
-
-       }
-
-       @Override
-       public void terminateComputationalByScheduler() {
-               
getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational);
-
-       }
-
-       @Override
-       public void removeScheduler(String user, String exploratoryName) {
-               schedulerJobDAO.removeScheduler(user, exploratoryName);
-       }
-
-       @Override
-       public void removeScheduler(String user, String exploratoryName, String 
computationalName) {
-               schedulerJobDAO.removeScheduler(user, exploratoryName, 
computationalName);
-       }
-
-       @Override
-       public List<SchedulerJobData> getActiveSchedulers(String user, long 
minutesOffset) {
-               final OffsetDateTime desiredDateTime = 
OffsetDateTime.now().plusMinutes(minutesOffset);
-               final Predicate<SchedulerJobData> userPredicate = s -> 
user.equals(s.getUser());
-               final Stream<SchedulerJobData> computationalSchedulersStream =
-                               
getComputationalSchedulersForStopping(desiredDateTime, false)
-                                               .stream()
-                                               .filter(userPredicate);
-               final Stream<SchedulerJobData> exploratorySchedulersStream =
-                               
getExploratorySchedulersForStopping(desiredDateTime, false)
-                                               .stream()
-                                               .filter(userPredicate);
-               return Stream.concat(computationalSchedulersStream, 
exploratorySchedulersStream)
-                               .collect(Collectors.toList());
-       }
-
-       private void stopComputational(SchedulerJobData job) {
-               final String expName = job.getExploratoryName();
-               final String compName = job.getComputationalName();
-               final String user = job.getUser();
-               log.debug("Stopping exploratory {} computational {} for user {} 
by scheduler", expName, compName, user);
-               
computationalService.stopSparkCluster(systemUserService.create(user), expName, 
compName);
-       }
-
-       private void terminateComputational(SchedulerJobData job) {
-               final String user = job.getUser();
-               final String expName = job.getExploratoryName();
-               final String compName = job.getComputationalName();
-               final UserInfo userInfo = systemUserService.create(user);
-               log.debug("Terminating exploratory {} computational {} for user 
{} by scheduler", expName, compName, user);
-               computationalService.terminateComputational(userInfo, expName, 
compName);
-       }
-
-       private void stopExploratory(SchedulerJobData job) {
-               final String expName = job.getExploratoryName();
-               final String user = job.getUser();
-               log.debug("Stopping exploratory {} for user {} by scheduler", 
expName, user);
-               exploratoryService.stop(systemUserService.create(user), 
expName);
-       }
-
-       private List<SchedulerJobData> 
getExploratorySchedulersForTerminating(OffsetDateTime now) {
-               return 
schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED)
-                               .stream()
-                               
.filter(canSchedulerForTerminatingBeApplied(now))
-                               .collect(Collectors.toList());
-       }
-
-       private List<SchedulerJobData> 
getComputationalSchedulersForTerminating(OffsetDateTime now) {
-               return 
schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
-                               DataEngineType.SPARK_STANDALONE, STOPPED, 
RUNNING)
-                               .stream()
-                               
.filter(canSchedulerForTerminatingBeApplied(now))
-                               .collect(Collectors.toList());
-       }
-
-       private void startExploratory(SchedulerJobData schedulerJobData) {
-               final String user = schedulerJobData.getUser();
-               final String exploratoryName = 
schedulerJobData.getExploratoryName();
-               log.debug("Starting exploratory {} for user {} by scheduler", 
exploratoryName, user);
-               exploratoryService.start(systemUserService.create(user), 
exploratoryName);
-               if (schedulerJobData.getJobDTO().isSyncStartRequired()) {
-                       log.trace("Starting computational for exploratory {} 
for user {} by scheduler", exploratoryName, user);
-                       final DataEngineType sparkCluster = 
DataEngineType.SPARK_STANDALONE;
-                       final List<UserComputationalResource> compToBeStarted =
-                                       
computationalDAO.findComputationalResourcesWithStatus(user, exploratoryName, 
STOPPED);
-
-                       compToBeStarted
-                                       .stream()
-                                       .filter(compResource -> 
shouldClusterBeStarted(sparkCluster, compResource))
-                                       .forEach(comp -> startSpark(user, 
exploratoryName, comp.getComputationalName()));
-               }
-       }
-
-       private void terminateExploratory(SchedulerJobData job) {
-               final String user = job.getUser();
-               final String expName = job.getExploratoryName();
-               log.debug("Terminating exploratory {} for user {} by 
scheduler", expName, user);
-               exploratoryService.terminate(systemUserService.create(user), 
expName);
-       }
-
-       private void startSpark(String user, String expName, String compName) {
-               log.debug("Starting exploratory {} computational {} for user {} 
by scheduler", expName, compName, user);
-               
computationalService.startSparkCluster(systemUserService.create(user), expName, 
compName);
-       }
-
-       private boolean shouldClusterBeStarted(DataEngineType sparkCluster, 
UserComputationalResource compResource) {
-               return Objects.nonNull(compResource.getSchedulerData()) && 
compResource.getSchedulerData().isSyncStartRequired()
-                               && 
compResource.getImageName().equals(getDockerImageName(sparkCluster));
-       }
-
-       /**
-        * Performs bulk updating operation with scheduler data for 
corresponding to exploratory Spark clusters.
-        * All these resources will obtain data which is equal to exploratory's 
except 'stopping' operation (it will be
-        * performed automatically with notebook stopping since Spark resources 
have such feature).
-        *
-        * @param user            user's name
-        * @param exploratoryName name of exploratory resource
-        * @param dto             scheduler job data.
-        */
-       private void shareSchedulerJobDataToSparkClusters(String user, String 
exploratoryName, SchedulerJobDTO dto) {
-               List<String> correspondingSparkClusters = 
computationalDAO.getComputationalResourcesWhereStatusIn(user,
-                               singletonList(DataEngineType.SPARK_STANDALONE), 
exploratoryName,
-                               STARTING, RUNNING, STOPPING, STOPPED);
-               SchedulerJobDTO dtoWithoutStopData = 
getSchedulerJobWithoutStopData(dto);
-               for (String sparkName : correspondingSparkClusters) {
-                       log.debug("Updating computational resource {} 
affiliated with exploratory {} for user {} with new " +
-                                       "scheduler job data {}...", sparkName, 
exploratoryName, user, dtoWithoutStopData);
-                       
computationalDAO.updateSchedulerDataForComputationalResource(user, 
exploratoryName, sparkName,
-                                       dtoWithoutStopData);
-               }
-       }
-
-       private List<SchedulerJobData> 
getExploratorySchedulersForStopping(OffsetDateTime currentDateTime,
-                                                                               
                                                           boolean 
checkInactivity) {
-
-               final Date clusterMaxInactivityAllowedDate =
-                               
from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant());
-               return 
schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING,
-                               clusterMaxInactivityAllowedDate)
-                               .stream()
-                               .filter(schedulerJobData -> 
shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
-                                               currentDateTime, 
schedulerJobData.getJobDTO().getStopDaysRepeat(),
-                                               
schedulerJobData.getJobDTO().getEndTime()) ||
-                                               (checkInactivity && 
exploratoryInactivityCondition(schedulerJobData)))
-                               .collect(Collectors.toList());
-       }
-
-       private List<SchedulerJobData> 
getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) {
-               return 
schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED)
-                               .stream()
-                               
.filter(canSchedulerForStartingBeApplied(currentDateTime))
-                               .collect(Collectors.toList());
-       }
-
-       private List<SchedulerJobData> 
getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) {
-               return schedulerJobDAO
-                               
.getComputationalSchedulerDataWithOneOfStatus(RUNNING, 
DataEngineType.SPARK_STANDALONE, STOPPED)
-                               .stream()
-                               
.filter(canSchedulerForStartingBeApplied(currentDateTime))
-                               .collect(Collectors.toList());
-       }
-
-
-       private Predicate<SchedulerJobData> 
canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) {
-               return schedulerJobData -> 
shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
-                               currentDateTime, 
schedulerJobData.getJobDTO().getStartDaysRepeat(),
-                               schedulerJobData.getJobDTO().getStartTime());
-       }
-
-       private Predicate<SchedulerJobData> 
canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) {
-               return schedulerJobData -> shouldBeTerminated(currentDateTime, 
schedulerJobData);
-       }
-
-       private boolean shouldBeTerminated(OffsetDateTime currentDateTime, 
SchedulerJobData schedulerJobData) {
-               final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
-               final LocalDateTime convertedCurrentTime = 
schedulerExecutionDate(jobDTO, currentDateTime);
-               return isSchedulerActive(schedulerJobData.getJobDTO(), 
convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
-                               
convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
-       }
-
-       private List<SchedulerJobData> 
getComputationalSchedulersForStopping(OffsetDateTime currentDateTime,
-                                                                               
                                                                 boolean 
checkInactivity) {
-               return schedulerJobDAO
-                               
.getComputationalSchedulerDataWithOneOfStatus(RUNNING, 
DataEngineType.SPARK_STANDALONE, RUNNING)
-                               .stream()
-                               .filter(schedulerJobData -> 
shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
-                                               currentDateTime, 
schedulerJobData.getJobDTO().getStopDaysRepeat(),
-                                               
schedulerJobData.getJobDTO().getEndTime()) ||
-                                               (checkInactivity && 
computationalInactivityCondition(schedulerJobData)))
-                               .collect(Collectors.toList());
-       }
-
-       private boolean computationalInactivityCondition(SchedulerJobData 
jobData) {
-               final SchedulerJobDTO schedulerData = jobData.getJobDTO();
-               return schedulerData.isCheckInactivityRequired() && 
computationalInactivityExceed(jobData, schedulerData);
-       }
-
-       private boolean computationalInactivityExceed(SchedulerJobData 
schedulerJobData, SchedulerJobDTO schedulerData) {
-               final String explName = schedulerJobData.getExploratoryName();
-               final String compName = schedulerJobData.getComputationalName();
-               final String user = schedulerJobData.getUser();
-               final UserComputationalResource c = 
computationalDAO.fetchComputationalFields(user, explName, compName);
-               final Long maxInactivity = schedulerData.getMaxInactivity();
-               return inactivityCondition(maxInactivity, c.getStatus(), 
c.getLastActivity());
-       }
-
-       private boolean exploratoryInactivityCondition(SchedulerJobData 
jobData) {
-               final SchedulerJobDTO schedulerData = jobData.getJobDTO();
-               return schedulerData.isCheckInactivityRequired() && 
exploratoryInactivityExceed(jobData, schedulerData);
-       }
-
-       private boolean exploratoryInactivityExceed(SchedulerJobData 
schedulerJobData, SchedulerJobDTO schedulerData) {
-               final String expName = schedulerJobData.getExploratoryName();
-               final String user = schedulerJobData.getUser();
-               final UserInstanceDTO userInstanceDTO = 
exploratoryDAO.fetchExploratoryFields(user, expName, true);
-               final boolean canBeStopped = userInstanceDTO.getResources()
-                               .stream()
-                               .map(UserComputationalResource::getStatus)
-                               .map(UserInstanceStatus::of)
-                               .noneMatch(status -> status.in(TERMINATING, 
CONFIGURING, CREATING));
-               return canBeStopped && 
inactivityCondition(schedulerData.getMaxInactivity(), 
userInstanceDTO.getStatus(),
-                               userInstanceDTO.getLastActivity());
-       }
-
-       private boolean inactivityCondition(Long maxInactivity, String status, 
LocalDateTime lastActivity) {
-               return UserInstanceStatus.RUNNING.toString().equals(status) &&
-                               Optional.ofNullable(lastActivity)
-                                               .map(la -> 
la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now()))
-                                               .orElse(Boolean.FALSE);
-       }
-
-       private void populateDefaultSchedulerValues(SchedulerJobDTO dto) {
-               if (Objects.isNull(dto.getBeginDate()) || 
StringUtils.isBlank(dto.getBeginDate().toString())) {
-                       dto.setBeginDate(LocalDate.now());
-               }
-               if (Objects.isNull(dto.getTimeZoneOffset()) || 
StringUtils.isBlank(dto.getTimeZoneOffset().toString())) {
-                       
dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset());
-               }
-       }
-
-       private void validateExploratoryStatus(String user, String 
exploratoryName) {
-               final UserInstanceDTO userInstance = 
exploratoryDAO.fetchExploratoryFields(user, exploratoryName);
-               validateResourceStatus(userInstance.getStatus());
-       }
-
-       private void validateComputationalStatus(String user, String 
exploratoryName, String computationalName) {
-               final UserComputationalResource computationalResource =
-                               computationalDAO.fetchComputationalFields(user, 
exploratoryName, computationalName);
-               final String computationalStatus = 
computationalResource.getStatus();
-               validateResourceStatus(computationalStatus);
-       }
-
-       private void validateResourceStatus(String resourceStatus) {
-               final UserInstanceStatus status = 
UserInstanceStatus.of(resourceStatus);
-               if (Objects.isNull(status) || 
status.in(UserInstanceStatus.TERMINATED, TERMINATING,
-                               UserInstanceStatus.FAILED)) {
-                       throw new 
ResourceInappropriateStateException(String.format("Can not create/update 
scheduler for user " +
-                                       "instance with status: %s", status));
-               }
-       }
-
-       private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, 
OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
-                                                                               
          LocalTime time) {
-               LocalDateTime convertedDateTime = schedulerExecutionDate(dto, 
dateTime);
-
-               return isSchedulerActive(dto, convertedDateTime)
-                               && 
daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
-                               && convertedDateTime.toLocalTime().equals(time);
-       }
-
-       private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime 
convertedDateTime) {
-               return 
!convertedDateTime.toLocalDate().isBefore(dto.getBeginDate())
-                               && finishDateAfterCurrentDate(dto, 
convertedDateTime);
-       }
-
-       private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, 
OffsetDateTime dateTime) {
-               ZoneOffset zOffset = dto.getTimeZoneOffset();
-               OffsetDateTime roundedDateTime = OffsetDateTime.of(
-                               dateTime.toLocalDate(),
-                               LocalTime.of(dateTime.toLocalTime().getHour(), 
dateTime.toLocalTime().getMinute()),
-                               dateTime.getOffset());
-
-               return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
-                               ZoneId.ofOffset(TIMEZONE_PREFIX, 
zOffset)).toLocalDateTime();
-       }
-
-       private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, 
LocalDateTime currentDateTime) {
-               return Objects.isNull(dto.getFinishDate()) || 
!currentDateTime.toLocalDate().isAfter(dto.getFinishDate());
-       }
-
-       private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO 
dto) {
-               SchedulerJobDTO convertedDto = new SchedulerJobDTO();
-               convertedDto.setBeginDate(dto.getBeginDate());
-               convertedDto.setFinishDate(dto.getFinishDate());
-               convertedDto.setStartTime(dto.getStartTime());
-               convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat());
-               convertedDto.setTerminateDateTime(dto.getTerminateDateTime());
-               convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset());
-               convertedDto.setSyncStartRequired(dto.isSyncStartRequired());
-               return convertedDto;
-       }
+    private static final String SCHEDULER_NOT_FOUND_MSG =
+            "Scheduler job data not found for user %s with exploratory %s";
+    private static final long ALLOWED_INACTIVITY_MINUTES = 1L;
+
+    @Inject
+    private SchedulerJobDAO schedulerJobDAO;
+
+    @Inject
+    private ExploratoryDAO exploratoryDAO;
+
+    @Inject
+    private ComputationalDAO computationalDAO;
+
+    @Inject
+    private ExploratoryService exploratoryService;
+
+    @Inject
+    private ComputationalService computationalService;
+
+    @Inject
+    private SystemUserInfoService systemUserService;
+
+    @Override
+    public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String user, 
String exploratoryName) {
+        return 
schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, 
exploratoryName)
+                .orElseThrow(() -> new 
ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
+                        exploratoryName)));
+    }
+
+    @Override
+    public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String 
user, String exploratoryName,
+                                                                     String 
computationalName) {
+        return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, 
exploratoryName, computationalName)
+                .orElseThrow(() -> new 
ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user,
+                        exploratoryName) + " with computational resource " + 
computationalName));
+    }
+
+    @Override
+    public void updateExploratorySchedulerData(String user, String 
exploratoryName, SchedulerJobDTO dto) {
+        validateExploratoryStatus(user, exploratoryName);
+        populateDefaultSchedulerValues(dto);
+        log.debug("Updating exploratory {} for user {} with new scheduler job 
data: {}...", exploratoryName, user,
+                dto);
+        exploratoryDAO.updateSchedulerDataForUserAndExploratory(user, 
exploratoryName, dto);
+        if (dto.isSyncStartRequired()) {
+            shareSchedulerJobDataToSparkClusters(user, exploratoryName, dto);
+        } else {
+            computationalDAO.updateSchedulerSyncFlag(user, exploratoryName, 
dto.isSyncStartRequired());
+        }
+    }
+
+    @Override
+    public void updateComputationalSchedulerData(String user, String 
exploratoryName, String computationalName,
+                                                 SchedulerJobDTO dto) {
+        validateExploratoryStatus(user, exploratoryName);
+        validateComputationalStatus(user, exploratoryName, computationalName);
+        populateDefaultSchedulerValues(dto);
+        log.debug("Updating computational resource {} affiliated with 
exploratory {} for user {} with new scheduler " +
+                "job data {}...", computationalName, exploratoryName, user, 
dto);
+        computationalDAO.updateSchedulerDataForComputationalResource(user, 
exploratoryName, computationalName, dto);
+    }
+
+    @Override
+    public void stopComputationalByScheduler() {
+        getComputationalSchedulersForStopping(OffsetDateTime.now(), true)
+                .forEach(this::stopComputational);
+    }
+
+    @Override
+    public void stopExploratoryByScheduler() {
+        getExploratorySchedulersForStopping(OffsetDateTime.now(), true)
+                .forEach(this::stopExploratory);
+    }
+
+    @Override
+    public void startExploratoryByScheduler() {
+        getExploratorySchedulersForStarting(OffsetDateTime.now())
+                .forEach(this::startExploratory);
+    }
+
+    @Override
+    public void startComputationalByScheduler() {
+        getComputationalSchedulersForStarting(OffsetDateTime.now())
+                .forEach(job -> startSpark(job.getUser(), 
job.getExploratoryName(), job.getComputationalName()));
+    }
+
+    @Override
+    public void terminateExploratoryByScheduler() {
+        getExploratorySchedulersForTerminating(OffsetDateTime.now())
+                .forEach(this::terminateExploratory);
+
+    }
+
+    @Override
+    public void terminateComputationalByScheduler() {
+        
getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational);
+
+    }
+
+    @Override
+    public void removeScheduler(String user, String exploratoryName) {
+        schedulerJobDAO.removeScheduler(user, exploratoryName);
+    }
+
+    @Override
+    public void removeScheduler(String user, String exploratoryName, String 
computationalName) {
+        schedulerJobDAO.removeScheduler(user, exploratoryName, 
computationalName);
+    }
+
+    @Override
+    public List<SchedulerJobData> getActiveSchedulers(String user, long 
minutesOffset) {
+        final OffsetDateTime desiredDateTime = 
OffsetDateTime.now().plusMinutes(minutesOffset);
+        final Predicate<SchedulerJobData> userPredicate = s -> 
user.equals(s.getUser());
+        final Stream<SchedulerJobData> computationalSchedulersStream =
+                getComputationalSchedulersForStopping(desiredDateTime, false)
+                        .stream()
+                        .filter(userPredicate);
+        final Stream<SchedulerJobData> exploratorySchedulersStream =
+                getExploratorySchedulersForStopping(desiredDateTime, false)
+                        .stream()
+                        .filter(userPredicate);
+        return Stream.concat(computationalSchedulersStream, 
exploratorySchedulersStream)
+                .collect(Collectors.toList());
+    }
+
+    private void stopComputational(SchedulerJobData job) {
+        final String expName = job.getExploratoryName();
+        final String compName = job.getComputationalName();
+        final String user = job.getUser();
+        log.debug("Stopping exploratory {} computational {} for user {} by 
scheduler", expName, compName, user);
+        computationalService.stopSparkCluster(systemUserService.create(user), 
expName, compName);
+    }
+
+    private void terminateComputational(SchedulerJobData job) {
+        final String user = job.getUser();
+        final String expName = job.getExploratoryName();
+        final String compName = job.getComputationalName();
+        final UserInfo userInfo = systemUserService.create(user);
+        log.debug("Terminating exploratory {} computational {} for user {} by 
scheduler", expName, compName, user);
+        computationalService.terminateComputational(userInfo, expName, 
compName);
+    }
+
+    private void stopExploratory(SchedulerJobData job) {
+        final String expName = job.getExploratoryName();
+        final String user = job.getUser();
+        log.debug("Stopping exploratory {} for user {} by scheduler", expName, 
user);
+        exploratoryService.stop(systemUserService.create(user), expName);
+    }
+
+    private List<SchedulerJobData> 
getExploratorySchedulersForTerminating(OffsetDateTime now) {
+        return 
schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED)
+                .stream()
+                .filter(canSchedulerForTerminatingBeApplied(now))
+                .collect(Collectors.toList());
+    }
+
+    private List<SchedulerJobData> 
getComputationalSchedulersForTerminating(OffsetDateTime now) {
+        return 
schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING,
+                DataEngineType.SPARK_STANDALONE, STOPPED, RUNNING)
+                .stream()
+                .filter(canSchedulerForTerminatingBeApplied(now))
+                .collect(Collectors.toList());
+    }
+
+    private void startExploratory(SchedulerJobData schedulerJobData) {
+        final String user = schedulerJobData.getUser();
+        final String exploratoryName = schedulerJobData.getExploratoryName();
+        log.debug("Starting exploratory {} for user {} by scheduler", 
exploratoryName, user);
+        exploratoryService.start(systemUserService.create(user), 
exploratoryName);
+        if (schedulerJobData.getJobDTO().isSyncStartRequired()) {
+            log.trace("Starting computational for exploratory {} for user {} 
by scheduler", exploratoryName, user);
+            final DataEngineType sparkCluster = 
DataEngineType.SPARK_STANDALONE;
+            final List<UserComputationalResource> compToBeStarted =
+                    
computationalDAO.findComputationalResourcesWithStatus(user, exploratoryName, 
STOPPED);
+
+            compToBeStarted
+                    .stream()
+                    .filter(compResource -> 
shouldClusterBeStarted(sparkCluster, compResource))
+                    .forEach(comp -> startSpark(user, exploratoryName, 
comp.getComputationalName()));
+        }
+    }
+
+    private void terminateExploratory(SchedulerJobData job) {
+        final String user = job.getUser();
+        final String expName = job.getExploratoryName();
+        log.debug("Terminating exploratory {} for user {} by scheduler", 
expName, user);
+        exploratoryService.terminate(systemUserService.create(user), expName);
+    }
+
+    private void startSpark(String user, String expName, String compName) {
+        log.debug("Starting exploratory {} computational {} for user {} by 
scheduler", expName, compName, user);
+        computationalService.startSparkCluster(systemUserService.create(user), 
expName, compName);
+    }
+
+    private boolean shouldClusterBeStarted(DataEngineType sparkCluster, 
UserComputationalResource compResource) {
+        return Objects.nonNull(compResource.getSchedulerData()) && 
compResource.getSchedulerData().isSyncStartRequired()
+                && 
compResource.getImageName().equals(getDockerImageName(sparkCluster));
+    }
+
+    /**
+     * Performs bulk updating operation with scheduler data for corresponding 
to exploratory Spark clusters.
+     * All these resources will obtain data which is equal to exploratory's 
except 'stopping' operation (it will be
+     * performed automatically with notebook stopping since Spark resources 
have such feature).
+     *
+     * @param user            user's name
+     * @param exploratoryName name of exploratory resource
+     * @param dto             scheduler job data.
+     */
+    private void shareSchedulerJobDataToSparkClusters(String user, String 
exploratoryName, SchedulerJobDTO dto) {
+        List<String> correspondingSparkClusters = 
computationalDAO.getComputationalResourcesWhereStatusIn(user,
+                singletonList(DataEngineType.SPARK_STANDALONE), 
exploratoryName,
+                STARTING, RUNNING, STOPPING, STOPPED);
+        SchedulerJobDTO dtoWithoutStopData = 
getSchedulerJobWithoutStopData(dto);
+        for (String sparkName : correspondingSparkClusters) {
+            log.debug("Updating computational resource {} affiliated with 
exploratory {} for user {} with new " +
+                    "scheduler job data {}...", sparkName, exploratoryName, 
user, dtoWithoutStopData);
+            computationalDAO.updateSchedulerDataForComputationalResource(user, 
exploratoryName, sparkName,
+                    dtoWithoutStopData);
+        }
+    }
+
+    private List<SchedulerJobData> 
getExploratorySchedulersForStopping(OffsetDateTime currentDateTime,
+                                                                       boolean 
checkInactivity) {
+
+        final Date clusterMaxInactivityAllowedDate =
+                
from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant());
+        return 
schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING,
+                clusterMaxInactivityAllowedDate)
+                .stream()
+                .filter(schedulerJobData -> 
shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+                        currentDateTime, 
schedulerJobData.getJobDTO().getStopDaysRepeat(),
+                        schedulerJobData.getJobDTO().getEndTime()) ||
+                        (checkInactivity && 
exploratoryInactivityCondition(schedulerJobData)))
+                .collect(Collectors.toList());
+    }
+
+    private List<SchedulerJobData> 
getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) {
+        return schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED)
+                .stream()
+                .filter(canSchedulerForStartingBeApplied(currentDateTime))
+                .collect(Collectors.toList());
+    }
+
+    private List<SchedulerJobData> 
getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) {
+        return schedulerJobDAO
+                .getComputationalSchedulerDataWithOneOfStatus(RUNNING, 
DataEngineType.SPARK_STANDALONE, STOPPED)
+                .stream()
+                .filter(canSchedulerForStartingBeApplied(currentDateTime))
+                .collect(Collectors.toList());
+    }
+
+
+    private Predicate<SchedulerJobData> 
canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) {
+        return schedulerJobData -> 
shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+                currentDateTime, 
schedulerJobData.getJobDTO().getStartDaysRepeat(),
+                schedulerJobData.getJobDTO().getStartTime());
+    }
+
+    private Predicate<SchedulerJobData> 
canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) {
+        return schedulerJobData -> shouldBeTerminated(currentDateTime, 
schedulerJobData);
+    }
+
+    private boolean shouldBeTerminated(OffsetDateTime currentDateTime, 
SchedulerJobData schedulerJobData) {
+        final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO();
+        final LocalDateTime convertedCurrentTime = 
schedulerExecutionDate(jobDTO, currentDateTime);
+        return isSchedulerActive(schedulerJobData.getJobDTO(), 
convertedCurrentTime) && Objects.nonNull(jobDTO.getTerminateDateTime()) &&
+                convertedCurrentTime.equals(jobDTO.getTerminateDateTime());
+    }
+
+    private List<SchedulerJobData> 
getComputationalSchedulersForStopping(OffsetDateTime currentDateTime,
+                                                                         
boolean checkInactivity) {
+        return schedulerJobDAO
+                .getComputationalSchedulerDataWithOneOfStatus(RUNNING, 
DataEngineType.SPARK_STANDALONE, RUNNING)
+                .stream()
+                .filter(schedulerJobData -> 
shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(),
+                        currentDateTime, 
schedulerJobData.getJobDTO().getStopDaysRepeat(),
+                        schedulerJobData.getJobDTO().getEndTime()) ||
+                        (checkInactivity && 
computationalInactivityCondition(schedulerJobData)))
+                .collect(Collectors.toList());
+    }
+
+    private boolean computationalInactivityCondition(SchedulerJobData jobData) 
{
+        final SchedulerJobDTO schedulerData = jobData.getJobDTO();
+        return schedulerData.isCheckInactivityRequired() && 
computationalInactivityExceed(jobData, schedulerData);
+    }
+
+    private boolean computationalInactivityExceed(SchedulerJobData 
schedulerJobData, SchedulerJobDTO schedulerData) {
+        final String explName = schedulerJobData.getExploratoryName();
+        final String compName = schedulerJobData.getComputationalName();
+        final String user = schedulerJobData.getUser();
+        final UserComputationalResource c = 
computationalDAO.fetchComputationalFields(user, explName, compName);
+        final Long maxInactivity = schedulerData.getMaxInactivity();
+        return inactivityCondition(maxInactivity, c.getStatus(), 
c.getLastActivity());
+    }
+
+    private boolean exploratoryInactivityCondition(SchedulerJobData jobData) {
+        final SchedulerJobDTO schedulerData = jobData.getJobDTO();
+        return schedulerData.isCheckInactivityRequired() && 
exploratoryInactivityExceed(jobData, schedulerData);
+    }
+
+    private boolean exploratoryInactivityExceed(SchedulerJobData 
schedulerJobData, SchedulerJobDTO schedulerData) {
+        final String expName = schedulerJobData.getExploratoryName();
+        final String user = schedulerJobData.getUser();
+        final UserInstanceDTO userInstanceDTO = 
exploratoryDAO.fetchExploratoryFields(user, expName, true);
+        final boolean canBeStopped = userInstanceDTO.getResources()
+                .stream()
+                .map(UserComputationalResource::getStatus)
+                .map(UserInstanceStatus::of)
+                .noneMatch(status -> status.in(TERMINATING, CONFIGURING, 
CREATING, CREATING));
+        return canBeStopped && 
inactivityCondition(schedulerData.getMaxInactivity(), 
userInstanceDTO.getStatus(),
+                userInstanceDTO.getLastActivity());
+    }
+
+    private boolean inactivityCondition(Long maxInactivity, String status, 
LocalDateTime lastActivity) {
+        return UserInstanceStatus.RUNNING.toString().equals(status) &&
+                Optional.ofNullable(lastActivity)
+                        .map(la -> 
la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now()))
+                        .orElse(Boolean.FALSE);
+    }
+
+    private void populateDefaultSchedulerValues(SchedulerJobDTO dto) {
+        if (Objects.isNull(dto.getBeginDate()) || 
StringUtils.isBlank(dto.getBeginDate().toString())) {
+            dto.setBeginDate(LocalDate.now());
+        }
+        if (Objects.isNull(dto.getTimeZoneOffset()) || 
StringUtils.isBlank(dto.getTimeZoneOffset().toString())) {
+            
dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset());
+        }
+    }
+
+    private void validateExploratoryStatus(String user, String 
exploratoryName) {
+        final UserInstanceDTO userInstance = 
exploratoryDAO.fetchExploratoryFields(user, exploratoryName);
+        validateResourceStatus(userInstance.getStatus());
+    }
+
+    private void validateComputationalStatus(String user, String 
exploratoryName, String computationalName) {
+        final UserComputationalResource computationalResource =
+                computationalDAO.fetchComputationalFields(user, 
exploratoryName, computationalName);
+        final String computationalStatus = computationalResource.getStatus();
+        validateResourceStatus(computationalStatus);
+    }
+
+    private void validateResourceStatus(String resourceStatus) {
+        final UserInstanceStatus status = 
UserInstanceStatus.of(resourceStatus);
+        if (Objects.isNull(status) || status.in(UserInstanceStatus.TERMINATED, 
TERMINATING,
+                UserInstanceStatus.FAILED)) {
+            throw new ResourceInappropriateStateException(String.format("Can 
not create/update scheduler for user " +
+                    "instance with status: %s", status));
+        }
+    }
+
+    private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, 
OffsetDateTime dateTime, List<DayOfWeek> daysRepeat,
+                                              LocalTime time) {
+        LocalDateTime convertedDateTime = schedulerExecutionDate(dto, 
dateTime);
+
+        return isSchedulerActive(dto, convertedDateTime)
+                && 
daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek())
+                && convertedDateTime.toLocalTime().equals(time);
+    }
+
+    private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime 
convertedDateTime) {
+        return !convertedDateTime.toLocalDate().isBefore(dto.getBeginDate())
+                && finishDateAfterCurrentDate(dto, convertedDateTime);
+    }
+
+    private LocalDateTime schedulerExecutionDate(SchedulerJobDTO dto, 
OffsetDateTime dateTime) {
+        ZoneOffset zOffset = dto.getTimeZoneOffset();
+        OffsetDateTime roundedDateTime = OffsetDateTime.of(
+                dateTime.toLocalDate(),
+                LocalTime.of(dateTime.toLocalTime().getHour(), 
dateTime.toLocalTime().getMinute()),
+                dateTime.getOffset());
+
+        return ZonedDateTime.ofInstant(roundedDateTime.toInstant(),
+                ZoneId.ofOffset(TIMEZONE_PREFIX, zOffset)).toLocalDateTime();
+    }
+
+    private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, 
LocalDateTime currentDateTime) {
+        return Objects.isNull(dto.getFinishDate()) || 
!currentDateTime.toLocalDate().isAfter(dto.getFinishDate());
+    }
+
+    private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO 
dto) {
+        SchedulerJobDTO convertedDto = new SchedulerJobDTO();
+        convertedDto.setBeginDate(dto.getBeginDate());
+        convertedDto.setFinishDate(dto.getFinishDate());
+        convertedDto.setStartTime(dto.getStartTime());
+        convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat());
+        convertedDto.setTerminateDateTime(dto.getTerminateDateTime());
+        convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset());
+        convertedDto.setSyncStartRequired(dto.isSyncStartRequired());
+        return convertedDto;
+    }
 
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to