Updated Branches: refs/heads/master 012278dd7 -> 80805f85a
BATCHEE-11 supporting stop command locally (no issue using jaxrs) using a socket (deactivated by default) Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/80805f85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/80805f85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/80805f85 Branch: refs/heads/master Commit: 80805f85a046a1b5ede645fd28dec560758ca668 Parents: 012278d Author: Romain Manni-Bucau <[email protected]> Authored: Sun Jan 5 22:47:40 2014 +0100 Committer: Romain Manni-Bucau <[email protected]> Committed: Sun Jan 5 22:47:40 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/batchee/util/Batches.java | 6 +- .../org/apache/batchee/cli/command/Abandon.java | 15 ++- .../batchee/cli/command/JobOperatorCommand.java | 2 +- .../batchee/cli/command/SocketCommand.java | 110 +++++++++++++++++++ .../cli/command/SocketConfigurableCommand.java | 27 +++++ .../batchee/cli/command/StartableCommand.java | 101 ++++++++++++++++- .../org/apache/batchee/cli/command/Stop.java | 13 ++- .../java/org/apache/batchee/cli/MainTest.java | 27 ++++- .../batchee/cli/component/LongSample.java | 2 +- 9 files changed, 283 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/jbatch/src/main/java/org/apache/batchee/util/Batches.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/util/Batches.java b/jbatch/src/main/java/org/apache/batchee/util/Batches.java index 6b43545..b038a2e 100644 --- a/jbatch/src/main/java/org/apache/batchee/util/Batches.java +++ b/jbatch/src/main/java/org/apache/batchee/util/Batches.java @@ -40,6 +40,10 @@ public class Batches { } catch (final InterruptedException e) { return; } - } while (!BATCH_END_STATUSES.contains(jobOperator.getJobExecution(id).getBatchStatus())); + } while (!isDone(jobOperator, id)); + } + + public static boolean isDone(final JobOperator jobOperator, final long id) { + return BATCH_END_STATUSES.contains(jobOperator.getJobExecution(id).getBatchStatus()); } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/main/java/org/apache/batchee/cli/command/Abandon.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/Abandon.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/Abandon.java index c4ea4f7..4f4259a 100644 --- a/tools/cli/src/main/java/org/apache/batchee/cli/command/Abandon.java +++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/Abandon.java @@ -20,13 +20,22 @@ import io.airlift.command.Command; import io.airlift.command.Option; @Command(name = "abandon", description = "abandon a batch from its id") -public class Abandon extends JobOperatorCommand { +public class Abandon extends SocketCommand { @Option(name = "-id", description = "id of the batch to abandon", required = true) private long id; @Override - public void doRun() { - operator().abandon(id); + protected void postCommand() { info("Abandonned batch " + id); } + + @Override + protected String command() { + return "abandon " + id + " " + wait; + } + + @Override + protected void defaultRun() { + operator().abandon(id); + } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java index b910732..001e4cb 100644 --- a/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java +++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java @@ -52,7 +52,7 @@ public abstract class JobOperatorCommand implements Runnable { // Remote config @Option(name = "-url", description = "when using JAXRS the batchee resource url") - private String baseUrl = null; + protected String baseUrl = null; @Option(name = "-json", description = "when using JAXRS the json provider") private String jsonProvider = null; http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketCommand.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketCommand.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketCommand.java new file mode 100644 index 0000000..efc2070 --- /dev/null +++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketCommand.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cli.command; + +import io.airlift.command.Option; +import org.apache.batchee.container.exception.BatchContainerRuntimeException; +import org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class SocketCommand extends SocketConfigurableCommand { + @Option(name = "-timeout", description = "timeout for socket case") + private int timeout = 60000; + + protected boolean shouldUseSocket() { + return baseUrl == null; + } + + protected abstract String command(); + protected abstract void defaultRun(); + + protected void sendCommand() { + if (adminSocket < 0) { + throw new BatchContainerRuntimeException("specify -socket to be able to run this command"); + } + + Socket socket = null; + try { + socket = new Socket("localhost", adminSocket); + socket.setSoTimeout(timeout); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Integer> answer = new AtomicReference<Integer>(); + new AnswerThread(socket, answer, latch).start(); + + final OutputStream outputStream = socket.getOutputStream(); + outputStream.write(command().getBytes()); + outputStream.flush(); + socket.shutdownOutput(); + + try { + latch.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + info("no answer after " + timeout + "ms"); + return; + } + if (answer.get() != 0) { + info("unexpected answer: " + answer.get()); + } + } catch (final IOException e) { + throw new BatchContainerRuntimeException(e); + } finally { + IOUtils.closeQuietly(socket); + } + } + + @Override + public void doRun() { + if (shouldUseSocket()) { + sendCommand(); + } else { + defaultRun(); + } + postCommand(); + } + + protected abstract void postCommand(); + + private static class AnswerThread extends Thread { + private final Socket socket; + private final AtomicReference<Integer> answer; + private final CountDownLatch latch; + + public AnswerThread(final Socket socket, final AtomicReference<Integer> answer, final CountDownLatch latch) { + this.socket = socket; + this.answer = answer; + this.latch = latch; + setName("batchee-answer-thread"); + } + + @Override + public void run() { + try { + answer.set(socket.getInputStream().read()); + } catch (IOException e) { + answer.set(-1); + } + latch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketConfigurableCommand.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketConfigurableCommand.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketConfigurableCommand.java new file mode 100644 index 0000000..d1b2aeb --- /dev/null +++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/SocketConfigurableCommand.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cli.command; + +import io.airlift.command.Option; + +public abstract class SocketConfigurableCommand extends JobOperatorCommand { + @Option(name = "-wait", description = "should wait the end of the batch", arity = 1) + protected boolean wait = true; + + @Option(name = "-socket", description = "socket listening for stop/abandon commands") + protected int adminSocket = -1; +} http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/main/java/org/apache/batchee/cli/command/StartableCommand.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/StartableCommand.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/StartableCommand.java index 7440bc1..e1edf1b 100644 --- a/tools/cli/src/main/java/org/apache/batchee/cli/command/StartableCommand.java +++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/StartableCommand.java @@ -17,35 +17,60 @@ package org.apache.batchee.cli.command; import io.airlift.command.Arguments; -import io.airlift.command.Option; import org.apache.batchee.util.Batches; +import org.apache.commons.io.IOUtils; import javax.batch.operations.JobOperator; import javax.batch.runtime.BatchStatus; import javax.batch.runtime.JobExecution; import javax.batch.runtime.StepExecution; +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; -public abstract class StartableCommand extends JobOperatorCommand { +public abstract class StartableCommand extends SocketConfigurableCommand { private static final String LINE = "========================="; - @Option(name = "-wait", description = "should wait the end of the batch") - protected boolean wait = true; - @Arguments(description = "properties to pass to the batch") protected List<String> properties; @Override public void doRun() { final JobOperator operator = operator(); - final long id = doStart(operator); + + final AdminThread adminThread; + if (adminSocket > 0) { + adminThread = new AdminThread(operator, adminSocket); + adminThread.setName("batchee-admin-thread"); + adminThread.start(); + } else { + info("Admin mode deactivated, use -socket to activate it"); + adminThread = null; + } + + final long id; + try { + id = doStart(operator); + } catch (final Exception e) { + if (adminThread != null && adminThread.getServerSocket() != null) { + IOUtils.closeQuietly(adminThread.getServerSocket()); + } + e.printStackTrace(); + return; + } + if (wait) { Batches.waitForEnd(operator, id); report(operator, id); } + if (adminThread != null) { + adminThread.setId(id); + } } protected abstract long doStart(JobOperator operator); @@ -96,4 +121,68 @@ public abstract class StartableCommand extends JobOperatorCommand { } return props; } + + private static class AdminThread extends Thread { + private final JobOperator operator; + private final int adminSocketPort; + private ServerSocket serverSocket = null; + private long id = Integer.MIN_VALUE; + + public AdminThread(final JobOperator operator, final int adminSocket) { + this.operator = operator; + this.adminSocketPort = adminSocket; + } + + @Override + public void run() { + try { + serverSocket = new ServerSocket(adminSocketPort); + while (Integer.MIN_VALUE == id || !Batches.isDone(operator, id)) { + final Socket client = serverSocket.accept(); + final OutputStream outputStream = client.getOutputStream(); + synchronized (this) { // no need to support N clients + try { + final String[] command = IOUtils.toString(client.getInputStream()).trim().split(" "); + if (command.length >= 2) { + final long id = Long.parseLong(command[1]); + try { + if ("stop".equals(command[0])) { + operator.stop(id); + } else if ("abandon".equals(command[0])) { + operator.abandon(id); + } + } catch (final Exception e) { + // no-op + } + + if (command.length >= 3 && Boolean.parseBoolean(command[2])) { + Batches.waitForEnd(id); + } + + // let the client close if waiting + outputStream.write(0); + } else { // error + outputStream.write(-1); + } + outputStream.flush(); + } finally { + IOUtils.closeQuietly(client); + } + } + } + } catch (final IOException e) { + e.printStackTrace(); + } finally { + IOUtils.closeQuietly(serverSocket); + } + } + + public ServerSocket getServerSocket() { + return serverSocket; + } + + public void setId(final long id) { + this.id = id; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/main/java/org/apache/batchee/cli/command/Stop.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/Stop.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/Stop.java index c65d0f6..6ccc89b 100644 --- a/tools/cli/src/main/java/org/apache/batchee/cli/command/Stop.java +++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/Stop.java @@ -20,13 +20,22 @@ import io.airlift.command.Command; import io.airlift.command.Option; @Command(name = "stop", description = "stop a batch from its id") -public class Stop extends JobOperatorCommand { +public class Stop extends SocketCommand { @Option(name = "-id", description = "id of the batch to stop", required = true) private long id; @Override - public void doRun() { + protected String command() { + return "stop " + id + " " + wait; + } + + @Override + protected void defaultRun() { operator().stop(id); + } + + @Override + protected void postCommand() { info("Stopped batch " + id); } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/test/java/org/apache/batchee/cli/MainTest.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/test/java/org/apache/batchee/cli/MainTest.java b/tools/cli/src/test/java/org/apache/batchee/cli/MainTest.java index 0d966ba..2ebce01 100644 --- a/tools/cli/src/test/java/org/apache/batchee/cli/MainTest.java +++ b/tools/cli/src/test/java/org/apache/batchee/cli/MainTest.java @@ -81,14 +81,29 @@ public class MainTest { } @Test - public void stop() { - final JobOperator jobOperator = BatchRuntime.getJobOperator(); - final long id = jobOperator.start("long-sample", null); - - main(new String[]{"stop", "-id", Long.toString(id)}); + public void stop() { // abandon is the same + final Thread start = new Thread() { + @Override + public void run() { + main(new String[]{ "start", "-name", "long-sample", "-socket", "1236", "-wait", "false" }); + } + }; + start.run(); + + final String str = "Batch 'long-sample' started with id #"; + + final String out = stdout.getLog(); + int idx; + do { + idx = out.indexOf(str); + } while (idx < 0); + final int end = out.indexOf(System.getProperty("line.separator")); + final long id = Long.parseLong(out.substring(idx + str.length(), end)); + + main(new String[]{"stop", "-id", Long.toString(id), "-socket", "1236"}); assertThat(stdout.getLog(), containsString("Stopped")); - Batches.waitForEnd(jobOperator, id); + Batches.waitForEnd(id); } @Test http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/80805f85/tools/cli/src/test/java/org/apache/batchee/cli/component/LongSample.java ---------------------------------------------------------------------- diff --git a/tools/cli/src/test/java/org/apache/batchee/cli/component/LongSample.java b/tools/cli/src/test/java/org/apache/batchee/cli/component/LongSample.java index dd5776b..65c644b 100644 --- a/tools/cli/src/test/java/org/apache/batchee/cli/component/LongSample.java +++ b/tools/cli/src/test/java/org/apache/batchee/cli/component/LongSample.java @@ -25,7 +25,7 @@ import static java.lang.Thread.sleep; public class LongSample extends AbstractBatchlet { @Override public String process() throws Exception { - sleep(1000); + sleep(1500); return "ok"; } }
