elguardian commented on code in PR #2247:
URL: 
https://github.com/apache/incubator-kie-kogito-apps/pull/2247#discussion_r2287428351


##########
jobs/jobs-common/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.impl;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.kie.kogito.app.jobs.api.JobDetailsEventAdapter;
+import org.kie.kogito.app.jobs.api.JobExecutor;
+import org.kie.kogito.app.jobs.api.JobScheduler;
+import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
+import org.kie.kogito.app.jobs.api.JobSchedulerListener;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.app.jobs.spi.JobContext;
+import org.kie.kogito.app.jobs.spi.JobContextFactory;
+import org.kie.kogito.app.jobs.spi.JobStore;
+import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory;
+import org.kie.kogito.app.jobs.spi.memory.MemoryJobStore;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.WorkerExecutor;
+
+public class VertxJobScheduler implements JobScheduler, Handler<Long> {
+
+    private record TimerInfo(String jobId, Long timerId, Date timeout) {
+
+    }
+
+    private static Logger LOG = 
LoggerFactory.getLogger(VertxJobScheduler.class);
+
+    private Integer maxNumberOfRetries;
+
+    private Long refreshJobsInterval;
+
+    private List<EventPublisher> eventPublishers;
+
+    private List<JobExecutor> jobExecutors;
+
+    private JobStore jobStore;
+
+    private Vertx vertx;
+
+    private WorkerExecutor workerExecutor;
+
+    private JobContextFactory jobContextFactory;
+
+    private List<JobDetailsEventAdapter> jobEventAdapters;
+
+    private List<JobSchedulerListener> jobSchedulerListeners;
+
+    private List<JobTimeoutInterceptor> interceptors;
+
+    private ConcurrentMap<String, TimerInfo> jobsScheduled;
+
+    private Long refreshJobsIntervalTimerId;
+
+    private Long maxRefreshJobsIntervalWindow;
+
+    private Long retryInterval;
+
+    public Integer numberOfWorkerThreads;
+
+    public class VertxJobSchedulerBuilder implements JobSchedulerBuilder {
+
+        @Override
+        public JobSchedulerBuilder withRetryInterval(Long retryInterval) {
+            VertxJobScheduler.this.retryInterval = retryInterval;
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder 
withJobSchedulerListeners(JobSchedulerListener... jobSchedulerListeners) {
+            
VertxJobScheduler.this.jobSchedulerListeners.addAll(List.of(jobSchedulerListeners));
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withMaxRefreshJobsIntervalWindow(Long 
maxRefreshJobsIntervalWindow) {
+            VertxJobScheduler.this.maxRefreshJobsIntervalWindow = 
maxRefreshJobsIntervalWindow;
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withRefreshJobsInterval(Long 
refreshJobsInterval) {
+            VertxJobScheduler.this.refreshJobsInterval = refreshJobsInterval;
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withMaxNumberOfRetries(Integer 
maxNumberOfRetries) {
+            VertxJobScheduler.this.maxNumberOfRetries = maxNumberOfRetries;
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder 
withJobEventAdapters(JobDetailsEventAdapter... jobEventAdapters) {
+            
VertxJobScheduler.this.jobEventAdapters.addAll(List.of(jobEventAdapters));
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withEventPublishers(EventPublisher... 
eventPublishers) {
+            
VertxJobScheduler.this.eventPublishers.addAll(List.of(eventPublishers));
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withJobContextFactory(JobContextFactory 
jobContextFactory) {
+            VertxJobScheduler.this.jobContextFactory = jobContextFactory;
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withJobExecutors(JobExecutor... 
jobExecutors) {
+            VertxJobScheduler.this.jobExecutors.addAll(List.of(jobExecutors));
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withJobStore(JobStore jobStore) {
+            VertxJobScheduler.this.jobStore = jobStore;
+            return this;
+        }
+
+        @Override
+        public JobScheduler build() {
+            Collections.sort(VertxJobScheduler.this.interceptors);
+            return VertxJobScheduler.this;
+        }
+
+        @Override
+        public JobSchedulerBuilder 
withTimeoutInterceptor(JobTimeoutInterceptor... interceptors) {
+            VertxJobScheduler.this.interceptors.addAll(List.of(interceptors));
+            return this;
+        }
+
+        @Override
+        public JobSchedulerBuilder withNumberOfWorkerThreads(Integer 
numberOfWorkerThreads) {
+            VertxJobScheduler.this.numberOfWorkerThreads = 
numberOfWorkerThreads;
+            return this;
+        }
+
+    }
+
+    public VertxJobScheduler() {
+        this.jobExecutors = new ArrayList<>();
+        this.jobStore = new MemoryJobStore();
+        this.jobsScheduled = new ConcurrentHashMap<>();
+        this.eventPublishers = new ArrayList<>();
+        this.jobStore = new MemoryJobStore();
+        this.jobContextFactory = new MemoryJobContextFactory();
+        this.jobEventAdapters = new ArrayList<>();
+        this.jobSchedulerListeners = new ArrayList<>();
+        this.interceptors = new ArrayList<>();
+
+        this.numberOfWorkerThreads = 10;
+        this.maxNumberOfRetries = 3;
+        this.refreshJobsInterval = 1000L;
+        this.retryInterval = 10 * 1000L; // ten seconds
+        this.maxRefreshJobsIntervalWindow = 5 * 60 * 1000L; // every 5 minute
+    }
+
+    @Override
+    public void handle(Long timerId) {
+        Callable<Void> current = new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                syncWithJobStores();
+                return null;
+            }
+        };
+        for (JobTimeoutInterceptor interceptor : interceptors) {
+            current = interceptor.chainIntercept(current);
+        }
+        this.workerExecutor.executeBlocking(current);
+    }
+
+    private void syncWithJobStores() {
+        LOG.debug("Syncyng jobs");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to