PROTON-881: Add a Cat example.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/88df5e74 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/88df5e74 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/88df5e74 Branch: refs/heads/proton-j-reactor Commit: 88df5e7490183e01dfc8c63d2cfe3123286e604b Parents: 739005e Author: Adrian Preston <[email protected]> Authored: Fri Apr 17 17:55:27 2015 +0100 Committer: Adrian Preston <[email protected]> Committed: Wed May 6 23:23:34 2015 +0100 ---------------------------------------------------------------------- .../apache/qpid/proton/example/reactor/Cat.java | 95 ++++++++++++++++++++ .../qpid/proton/example/reactor/Echo.java | 69 ++++++++------ 2 files changed, 138 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88df5e74/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java new file mode 100644 index 0000000..53eb793 --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Pipe.SourceChannel; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; + +public class Cat extends BaseHandler { + + private class EchoHandler extends BaseHandler { + @Override + public void onSelectableInit(Event event) { + Selectable selectable = event.getSelectable(); + Reactor reactor = event.getReactor(); + selectable.setReading(true); + reactor.update(selectable); + } + + @Override + public void onSelectableReadable(Event event) { + Selectable selectable = event.getSelectable(); + SourceChannel channel = (SourceChannel)selectable.getChannel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + while(true) { + int amount = channel.read(buffer); + if (amount < 0) { + selectable.terminate(); + selectable.getReactor().update(selectable); + } + if (amount <= 0) break; + System.out.write(buffer.array(), 0, buffer.position()); + buffer.clear(); + } + } catch(IOException ioException) { + ioException.printStackTrace(); + selectable.terminate(); + selectable.getReactor().update(selectable); + } + } + } + + private final SourceChannel channel; + + private Cat(SourceChannel channel) { + this.channel = channel; + } + + @Override + public void onReactorInit(Event event) { + Reactor reactor = event.getReactor(); + Selectable selectable = reactor.selectable(); + selectable.setChannel(channel); + selectable.add(new EchoHandler()); + reactor.update(selectable); + } + + public static void main(String[] args) throws IOException { + if (args.length != 1) { + System.err.println("Specify a file name as an argument."); + System.exit(1); + } + FileInputStream inFile = new FileInputStream(args[0]); + SourceChannel inChannel = EchoInputStreamWrapper.wrap(inFile); + Reactor reactor = Proton.reactor(new Cat(inChannel)); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88df5e74/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java index 3913e17..2d5be72 100644 --- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java @@ -33,41 +33,58 @@ import org.apache.qpid.proton.reactor.Selectable; public class Echo extends BaseHandler { - @Override - public void onSelectableInit(Event event) { - Selectable selectable = event.getSelectable(); - Reactor reactor = event.getReactor(); - selectable.setReading(true); - reactor.update(selectable); - } + private class EchoHandler extends BaseHandler { + @Override + public void onSelectableInit(Event event) { + Selectable selectable = event.getSelectable(); + Reactor reactor = event.getReactor(); + selectable.setReading(true); + reactor.update(selectable); + } - @Override - public void onSelectableReadable(Event event) { - Selectable selectable = event.getSelectable(); - SourceChannel channel = (SourceChannel)selectable.getChannel(); - ByteBuffer buffer = ByteBuffer.allocate(1024); - try { - while(true) { - int amount = channel.read(buffer); - if (amount < 0) selectable.release(); - if (amount <= 0) break; - System.out.write(buffer.array(), 0, buffer.position()); - buffer.clear(); + @Override + public void onSelectableReadable(Event event) { + Selectable selectable = event.getSelectable(); + SourceChannel channel = (SourceChannel)selectable.getChannel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + while(true) { + int amount = channel.read(buffer); + if (amount < 0) { + selectable.terminate(); + selectable.getReactor().update(selectable); + } + if (amount <= 0) break; + System.out.write(buffer.array(), 0, buffer.position()); + buffer.clear(); + } + } catch(IOException ioException) { + ioException.printStackTrace(); + selectable.terminate(); + selectable.getReactor().update(selectable); } - } catch(IOException ioException) { - ioException.printStackTrace(); - selectable.release(); } } - public static void main(String[] args) throws IOException { + private final SourceChannel channel; - Reactor reactor = Proton.reactor(new Echo()); + private Echo(SourceChannel channel) { + this.channel = channel; + } - SourceChannel inChannel = EchoInputStreamWrapper.wrap(System.in); + @Override + public void onReactorInit(Event event) { + System.out.println("Type whatever you want and then use Control-D to exit:"); + Reactor reactor = event.getReactor(); Selectable selectable = reactor.selectable(); - selectable.setChannel(inChannel); + selectable.setChannel(channel); + selectable.add(new EchoHandler()); + reactor.update(selectable); + } + public static void main(String[] args) throws IOException { + SourceChannel inChannel = EchoInputStreamWrapper.wrap(System.in); + Reactor reactor = Proton.reactor(new Echo(inChannel)); reactor.run(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
