[
https://issues.apache.org/jira/browse/QUARKS-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193888#comment-15193888
]
ASF GitHub Bot commented on QUARKS-8:
-------------------------------------
Github user ddebrunner commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/9#discussion_r56055411
--- Diff:
samples/topology/src/main/java/quarks/samples/topology/JobPollingSample.java ---
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.samples.topology;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import quarks.execution.Job;
+import quarks.execution.JobRegistryService;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.runtime.etiao.JobRegistry;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Job monitoring by polling job state.
+ * <p>
+ * Demonstrates job monitoring using the {@link JobRegistryService}
service.
+ * The example starts a system monitoring application then concurrently
+ * submits two jobs. The monitoring application is using a polling source
+ * to periodically scan the job registry and generates tuples containing
the
+ * current state of registered jobs. Tuples are pushed to a sink which
prints
+ * them onto the system output.
+ */
+public class JobPollingSample {
+ private final DevelopmentProvider dtp;
+
+ public static void main(String[] args) throws Exception {
+
+ JobPollingSample app = new JobPollingSample();
+
+ // Start monitoring app
+ app.monitor();
+
+ Thread.sleep(2000);
+
+ // Asynchronously start two jobs
+ ScheduledExecutorService executor =
Executors.newScheduledThreadPool(2);
+ executor.schedule(app.getCallable("Monitored1"), 100,
TimeUnit.MILLISECONDS);
+ executor.schedule(app.getCallable("Monitored2"), 3300,
TimeUnit.MILLISECONDS);
+ }
+
+ JobPollingSample() throws Exception {
+ this.dtp = new DevelopmentProvider();
+ dtp.getServices().addService(JobRegistryService.class, new
JobRegistry());
+ }
+
+ void monitored(String name) throws InterruptedException,
ExecutionException {
+ Topology t = dtp.newTopology(name);
+
+ Random r = new Random();
+ TStream<Double> d = t.poll(() -> r.nextGaussian(), 100,
TimeUnit.MILLISECONDS);
+ d.sink(tuple -> System.out.print("."));
+
+ Thread.sleep(2000);
+ Future<Job> f = dtp.submit(t);
+ Job job = f.get();
+ Thread.sleep(5000);
+ job.stateChange(Job.Action.CLOSE);
+ Thread.sleep(2000);
+
+
provider().getServices().getService(JobRegistryService.class).removeJob(job.getId());
+ }
+
+ /**
+ * Monitoring application polls the job registry every 1 sec.
+ */
+ void monitor() {
+ Topology t = dtp.newTopology("Monitor");
+
+ TStream<Job.State[]> state = t.poll(() -> {
+ JobRegistryService jobs =
provider().getServices().getService(JobRegistryService.class);
+ List<Job.State> states = new ArrayList<>();
+ if (jobs != null) {
+ for (String id: jobs.getJobIds()) {
+ states.add(jobs.getJob(id).getCurrentState());
+ }
+ }
+ return states.toArray(new Job.State[0]);
+ }, 1, TimeUnit.SECONDS);
+
+ state.sink(states -> {
+ StringBuffer sb = new StringBuffer();
+ for (Job.State s : states) {
+ sb.append(s).append(',');
+ }
+ System.out.println(sb.toString());
+ });
+
+ dtp.submit(t);
+ }
+
+ private DevelopmentProvider provider() {
+ return dtp;
+ }
+
+ private Callable<Integer> getCallable(String name) {
+ return new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ monitored(name);
+ return new Integer(0);
--- End diff --
Just "return 0"?
> Restart topology on uncaught exception
> --------------------------------------
>
> Key: QUARKS-8
> URL: https://issues.apache.org/jira/browse/QUARKS-8
> Project: Quarks
> Issue Type: New Feature
> Components: Runtime
> Reporter: Victor Dogaru
> Assignee: Victor Dogaru
> Labels: failure-recovery
>
> If a Quarks thread abruptly terminates due to an uncaught exception the
> runtime shuts down.
> A mechanism is needed to prevent the Quarks application from becoming
> unavailable:
> * Have a monitor service which resubmits the topology in case it shuts down.
> * Restart the failed oplet. In the ETIAO runtime, a source and all downstream
> oplet invocations (up to an Isolate) will execute in the same thread, so the
> runtime needs to restart the failed path.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)