http://git-wip-us.apache.org/repos/asf/tomee/blob/bd7dbd0f/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java ---------------------------------------------------------------------- diff --git a/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java b/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java index c08d37b..7881062 100644 --- a/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java +++ b/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java @@ -1,778 +1,778 @@ -package org.apache.openejb.client; - -import sun.net.util.IPAddressUtil; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.net.NetworkInterface; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.ConcurrentModificationException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class MulticastPulseClient extends MulticastConnectionFactory { - - public static final String ORG_APACHE_OPENEJB_MULTIPULSE_TTL = "org.apache.openejb.multipulse.ttl"; - public static final String ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT = "org.apache.openejb.multipulse.uri.limit"; - - private static final Logger log = Logger.getLogger("OpenEJB.client"); - private static final String SERVER = "OpenEJB.MCP.Server:"; - private static final String CLIENT = "OpenEJB.MCP.Client:"; - private static final String BADURI = ":BadUri:"; - private static final String EMPTY = "NoService"; - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32")); - private static final int LIMIT = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT, "50000")); - private static final Map<URI, Set<URI>> knownUris = new HashMap<URI, Set<URI>>(); - private static NetworkInterface[] interfaces = getNetworkInterfaces(); - private static ExecutorService executor = null; - - private static synchronized NetworkInterface[] getInterfaces() { - if (null == interfaces) { - interfaces = getNetworkInterfaces(); - } - - return interfaces; - } - - private static synchronized ExecutorService getExecutorService() { - - if (null == executor) { - - int length = getInterfaces().length; - if (length < 1) { - length = 1; - } - - executor = Executors.newFixedThreadPool(length * 2); - } - - return executor; - } - - /** - * @param uri Connection URI - * @return Connection - * @throws IOException or error - * @throws IllegalArgumentException On undefined error - */ - @Override - public Connection getConnection(final URI uri) throws IOException { - - if (knownUris.size() >= LIMIT) { - //This is here just as a brake to prevent DOS or OOME. - //There is no way we should have more than this number of unique MutliPulse URI's in a LAN - throw new IllegalArgumentException("Unique MultiPulse URI limit of " + - LIMIT + - " reached. Increase using the system property '" + - ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT + - "'"); - } - - Set<URI> uriSet = knownUris.get(uri); - - if (null == uriSet || uriSet.isEmpty()) { - - final Map<String, String> params = getUriParameters(uri); - - final Set<String> schemes = getSet(params, "schemes", this.getDefaultSchemes()); - final String group = getString(params, "group", "default"); - final long timeout = getLong(params, "timeout", 250); - - try { - uriSet = MulticastPulseClient.discoverURIs(group, schemes, uri.getHost(), uri.getPort(), timeout); - } catch (final Exception e) { - throw new IllegalArgumentException("Unable to find an ejb server via the MultiPulse URI: " + uri); - } - - knownUris.put(uri, uriSet); - } - - for (final URI serviceURI : uriSet) { - - //Strip serverhost and group and try to connect - final URI tryUri = URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()); - - try { - return ConnectionManager.getConnection(tryUri); - } catch (final Exception e) { - - uriSet.remove(serviceURI); - - if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) { - //Notify server that this URI is not reachable - MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri), "group", "default"), tryUri, uri.getHost(), uri.getPort()); - } - - if (log.isLoggable(Level.FINE)) { - log.fine("Failed connection to: " + serviceURI); - } - } - } - - throw new IOException("Unable to connect an ejb server via the MultiPulse URI: " + uri); - } - - private static Map<String, String> getUriParameters(final URI uri) { - final Map<String, String> params; - try { - params = URIs.parseParamters(uri); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Invalid MultiPulse uri " + uri.toString(), e); - } - return params; - } - - /** - * Get a list of URIs discovered for the provided request. - * <p/> - * Returned URIs are of the format 'mp-{serverhost}:group:scheme://servicehost:port'. - * The serverhost is prefixed with 'mp-' in case the serverhost is an IP-Address, as RFC 2396 defines scheme must begin with a 'letter' - * - * @param forGroup Specific case sensitive group name or * for all - * @param schemes Acceptable scheme list - * @param host Multicast host address - * @param port Multicast port - * @param timeout Time to wait for a server response, at least 50ms - * @return A URI set, possibly empty - * @throws Exception On error - */ - public static Set<URI> discoverURIs(final String forGroup, final Set<String> schemes, final String host, final int port, long timeout) throws Exception { - - if (timeout < 50) { - timeout = 50; - } - - if (null == forGroup || forGroup.isEmpty()) { - throw new Exception("Specify a valid group or *"); - } - - if (null == schemes || schemes.isEmpty()) { - throw new Exception("Specify at least one scheme, 'ejbd' for example"); - } - - if (null == host || host.isEmpty()) { - throw new Exception("Specify a valid host name"); - } - - if (port < 1 || port > 65535) { - throw new Exception("Specify a valid port between 1 and 65535"); - } - - final InetAddress ia = getAddress(host); - - final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8); - final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port)); - - final AtomicBoolean running = new AtomicBoolean(true); - final List<Future> futures = Collections.synchronizedList(new ArrayList<Future>()); - - MulticastSocket[] clientSockets = null; - - try { - clientSockets = MulticastPulseClient.getSockets(ia, port); - final MulticastSocket[] clientSocketsFinal = clientSockets; - - final Timer timer = new Timer(true); - - final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() { - @Override - public int compare(final URI uri1, final URI uri2) { - - //Ignore server hostname - URI u1 = URI.create(uri1.getSchemeSpecificPart()); - URI u2 = URI.create(uri2.getSchemeSpecificPart()); - - //Ignore scheme (ejb,ejbs,etc.) - u1 = URI.create(u1.getSchemeSpecificPart()); - u2 = URI.create(u2.getSchemeSpecificPart()); - - //Compare URI hosts - int i = compare(u1.getHost(), u2.getHost()); - if (i != 0) { - i = uri1.compareTo(uri2); - } - - return i; - } - - private int compare(final String h1, final String h2) { - - //Sort by hostname, IPv4, IPv6 - - try { - if (IPAddressUtil.isIPv4LiteralAddress(h1)) { - if (IPAddressUtil.isIPv6LiteralAddress(h2.replace("[", "").replace("]", ""))) { - return -1; - } - } else if (IPAddressUtil.isIPv6LiteralAddress(h1.replace("[", "").replace("]", ""))) { - if (IPAddressUtil.isIPv4LiteralAddress(h2)) { - return 1; - } - } else if (0 != h1.compareTo(h2)) { - return -1; - } - } catch (final Exception e) { - //Ignore - } - - return h1.compareTo(h2); - } - }); - - final ReentrantLock setLock = new ReentrantLock(); - - //Start threads that listen for multicast packets on our channel. - //These need to start 'before' we pulse a request. - final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length); - - for (final MulticastSocket socket : clientSocketsFinal) { - - futures.add(getExecutorService().submit(new Runnable() { - @Override - public void run() { - try { - final DatagramPacket response = new DatagramPacket(new byte[2048], 2048); - latchListeners.countDown(); - - while (running.get()) { - try { - - socket.receive(response); - - final SocketAddress sa = response.getSocketAddress(); - - if (null != sa && (sa instanceof InetSocketAddress)) { - - int len = response.getLength(); - if (len > 2048) { - - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len}); - } - len = 2048; - } - - String s = new String(response.getData(), 0, len); - - if (s.startsWith(MulticastPulseClient.SERVER)) { - - s = (s.replace(MulticastPulseClient.SERVER, "")); - final String group = s.substring(0, s.indexOf(':')); - s = s.substring(group.length() + 1); - - if (!"*".equals(forGroup) && !forGroup.equals(group)) { - continue; - } - - final String services = s.substring(0, s.lastIndexOf('|')); - s = s.substring(services.length() + 1); - - final String[] serviceList = services.split("\\|"); - final String[] hosts = s.split(","); - - for (final String svc : serviceList) { - - if (EMPTY.equals(svc)) { - continue; - } - - final URI serviceUri; - try { - serviceUri = URI.create(svc); - } catch (final Exception e) { - continue; - } - - if (schemes.contains(serviceUri.getScheme())) { - - //Just because multicast was received on this host is does not mean the service is on the same - //We can however use this to identify an individual machine and group - final String serverHost = ((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress(); - - final String serviceHost = serviceUri.getHost(); - if (MulticastPulseClient.isLocalAddress(serviceHost, false)) { - if (!MulticastPulseClient.isLocalAddress(serverHost, false)) { - //A local service is only available to a local client - continue; - } - } - - final String svcfull = ("mp-" + serverHost + ":" + group + ":" + svc); - - setLock.lock(); - - try { - if (svcfull.contains("0.0.0.0")) { - for (final String h : hosts) { - if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo - set.add(URI.create(svcfull.replace("0.0.0.0", ipFormat(h)))); - } - } - } else if (svcfull.contains("[::]")) { - for (final String h : hosts) { - if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo - set.add(URI.create(svcfull.replace("[::]", ipFormat(h)))); - } - } - } else { - //Just add as is - set.add(URI.create(svcfull)); - } - } catch (final Exception e) { - //Ignore - } finally { - setLock.unlock(); - } - } - } - } - } - - } catch (final Exception e) { - //Ignore - } - } - } finally { - try { - socket.leaveGroup(ia); - } catch (final Exception e) { - //Ignore - } - try { - socket.close(); - } catch (final Exception e) { - //Ignore - } - } - } - })); - } - - try { - //Give listener threads a reasonable amount of time to start - if (latchListeners.await(clientSocketsFinal.length * 2, TimeUnit.SECONDS)) { - - //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout - //This pulse is designed to tell a listening server to wake up and pulse back a response - futures.add(0, getExecutorService().submit(new Runnable() { - @Override - public void run() { - while (running.get()) { - //Pulse to listening servers - It is thread safe to use same sockets as send/receive synchronization is only on the packet - for (final MulticastSocket socket : clientSocketsFinal) { - - if (running.get()) { - try { - socket.send(request); - } catch (final Exception e) { - //Ignore - } - } else { - break; - } - } - - if (running.get()) { - try { - Thread.sleep(10); - } catch (final InterruptedException e) { - break; - } - } - } - } - })); - } else { - timeout = 1; - } - - } catch (final InterruptedException e) { - //Terminate as quickly as possible - timeout = 1; - } - - //Kill the threads after timeout - timer.schedule(new TimerTask() { - @Override - public void run() { - - running.set(false); - - try { - for (final Future future : futures) { - future.cancel(true); - } - } catch (final ConcurrentModificationException e) { - //Ignore - } - - } - }, timeout); - - //Wait for threads to complete - for (final Future future : futures) { - try { - future.get(); - } catch (final Exception e) { - //Ignore - } - } - - setLock.lock(); - try { - return new TreeSet<URI>(set); - } finally { - setLock.unlock(); - } - } finally { - - //Just to be sure we are clean - for (final Future future : futures) { - try { - future.cancel(true); - } catch (final Exception e) { - //Ignore - } - } - - futures.clear(); - - for (final MulticastSocket socket : clientSockets) { - - try { - socket.leaveGroup(ia); - } catch (final Exception e) { - //Ignore - } - try { - socket.close(); - } catch (final Exception e) { - //Ignore - } - } - } - } - - private static InetAddress getAddress(final String host) throws Exception { - final InetAddress ia; - try { - ia = InetAddress.getByName(host); - } catch (final UnknownHostException e) { - throw new Exception(host + " is not a valid address", e); - } - - if (null == ia || !ia.isMulticastAddress()) { - throw new Exception(host + " is not a valid multicast address"); - } - return ia; - } - - /** - * Is the provided host a local host - * - * @param host The host to test - * @param wildcardIsLocal Should 0.0.0.0 or [::] be deemed as local - * @return True is the host is a local host else false - */ - public static boolean isLocalAddress(final String host, final boolean wildcardIsLocal) { - - final InetAddress addr; - try { - addr = InetAddress.getByName(host); - } catch (final UnknownHostException e) { - return false; - } - - // Check if the address is a valid special local or loop back - if ((wildcardIsLocal && addr.isAnyLocalAddress()) || addr.isLoopbackAddress()) { - return true; - } - - // Check if the address is defined on any local interface - try { - return NetworkInterface.getByInetAddress(addr) != null; - } catch (final SocketException e) { - return false; - } - } - - private static String ipFormat(final String h) throws UnknownHostException { - - final InetAddress ia = InetAddress.getByName(h); - if (ia instanceof Inet6Address) { - return "[" + ia.getHostAddress() + "]"; - } else { - return h; - } - } - - public static MulticastSocket[] getSockets(final InetAddress ia, final int port) throws Exception { - - final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>(); - - for (final NetworkInterface ni : getInterfaces()) { - - MulticastSocket ms = null; - - try { - - ms = new MulticastSocket(port); - ms.setNetworkInterface(ni); - ms.setSoTimeout(0); - ms.setTimeToLive(TTL); - if (!ms.getBroadcast()) { - ms.setBroadcast(true); - } - ms.joinGroup(ia); - - list.add(ms); - - } catch (final Exception e) { - - if (null != ms) { - try { - ms.close(); - } catch (final Exception t) { - //Ignore - } - } - - } - } - - return list.toArray(new MulticastSocket[list.size()]); - } - - private static NetworkInterface[] getNetworkInterfaces() { - - final HashSet<NetworkInterface> list = new HashSet<NetworkInterface>(); - - try { - final Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); - while (interfaces.hasMoreElements()) { - final NetworkInterface next = interfaces.nextElement(); - - if (next.supportsMulticast() && next.isUp()) { - list.add(next); - } - } - } catch (final SocketException e) { - //Ignore - } - - return list.toArray(new NetworkInterface[list.size()]); - } - - private static final CommandParser cmd = new CommandParser() { - @Override - protected void init() { - category("Options"); - - opt('g', "group").type(String.class).value("*").description("Group name"); - - opt('h', "host").type(String.class).value("239.255.3.2").description("Multicast address"); - - opt('p', "port").type(int.class).value(6142).description("Multicast port"); - - opt('t', "timeout").type(int.class).value(1000).description("Pulse back timeout"); - } - - @Override - protected List<String> validate(final Arguments arguments) { - return super.validate(arguments); - } - - @Override - protected List<String> usage() { - return super.usage(); - } - }; - - @SuppressWarnings("UseOfSystemOutOrSystemErr") - public static void main(final String[] args) throws Exception { - - final CommandParser.Arguments arguments; - try { - arguments = cmd.parse(args); - } catch (final CommandParser.HelpException e) { - System.exit(0); - throw new Exception(); // never reached, but keeps compiler happy - } catch (final CommandParser.InvalidOptionsException e) { - System.exit(1); - throw new Exception(); // never reached, but keeps compiler happy - } - - final Options options = arguments.options(); - - final String discover = options.get("group", "*"); - final String mchost = options.get("host", "239.255.3.2"); - final int mcport = options.get("port", 6142); - final int timeout = options.get("timeout", 1500); - - System.out.println(String.format("Using discovery options group=%1$s, host=%2$s, port=%3$s, timeout=%4$s", discover, mchost, mcport, timeout)); - System.out.println(); - - final AtomicBoolean running = new AtomicBoolean(true); - - final Thread t = new Thread(new Runnable() { - @SuppressWarnings("UseOfSystemOutOrSystemErr") - @Override - public void run() { - while (running.get()) { - - Set<URI> uriSet = null; - try { - uriSet = MulticastPulseClient.discoverURIs(discover, new HashSet<String>(Arrays.asList("ejbd", "ejbds", "http", "https")), mchost, mcport, timeout); - } catch (final Exception e) { - System.err.println(e.getMessage()); - } - - final int size = uriSet.size(); - if (uriSet != null && size > 0) { - - final int st = (timeout / size); - - for (final URI uri : uriSet) { - - final String server = uri.getScheme().replace("mp-", ""); - URI uriSub = URI.create(uri.getSchemeSpecificPart()); - - final String group = uriSub.getScheme(); - uriSub = URI.create(uriSub.getSchemeSpecificPart()); - - final String host = uriSub.getHost(); - final int port = uriSub.getPort(); - - if (MulticastPulseClient.isLocalAddress(host, false) && !MulticastPulseClient.isLocalAddress(server, false)) { - System.out.println(server + ":" + group + " - " + uriSub.toASCIIString() + " is not a local service"); - continue; - } - - System.out.print(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: "); - - boolean b = false; - final Socket s = new Socket(); - try { - s.connect(new InetSocketAddress(host, port), st); - b = true; - } catch (final Exception e) { - if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) { - MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport); - System.out.print("" + e + " : "); - } - } finally { - try { - s.close(); - } catch (final Exception e) { - //Ignore - } - } - - System.out.println(b); - } - } else { - System.out.println("### Failed to discover server: " + discover); - } - - System.out.println("."); - - try { - Thread.sleep(500); - } catch (final InterruptedException e) { - //Ignore - } - } - } - }, "MulticastPulseClient Test"); - - t.setDaemon(true); - t.start(); - - //noinspection ResultOfMethodCallIgnored - System.in.read(); - - running.set(false); - t.interrupt(); - } - - /** - * Asynchronous attempt to broadcast a bad URI on our channel. - * Hopefully the culprit server will hear this and stop sending it. - * - * @param uri Bad URI to broadcast - */ - private static void broadcastBadUri(final String group, final URI uri, final String host, final int port) { - - getExecutorService().submit(new Runnable() { - @Override - public void run() { - try { - final InetAddress ia = getAddress(host); - - final byte[] bytes = (MulticastPulseClient.CLIENT + group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8); - final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port)); - - final MulticastSocket[] multicastSockets = MulticastPulseClient.getSockets(ia, port); - - for (final MulticastSocket socket : multicastSockets) { - - try { - socket.send(request); - } catch (final Exception e) { - log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e); - } - } - } catch (final Exception e) { - log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri, e); - } - } - }); - } -} +package org.apache.openejb.client; + +import sun.net.util.IPAddressUtil; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.ConcurrentModificationException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class MulticastPulseClient extends MulticastConnectionFactory { + + public static final String ORG_APACHE_OPENEJB_MULTIPULSE_TTL = "org.apache.openejb.multipulse.ttl"; + public static final String ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT = "org.apache.openejb.multipulse.uri.limit"; + + private static final Logger log = Logger.getLogger("OpenEJB.client"); + private static final String SERVER = "OpenEJB.MCP.Server:"; + private static final String CLIENT = "OpenEJB.MCP.Client:"; + private static final String BADURI = ":BadUri:"; + private static final String EMPTY = "NoService"; + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32")); + private static final int LIMIT = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT, "50000")); + private static final Map<URI, Set<URI>> knownUris = new HashMap<URI, Set<URI>>(); + private static NetworkInterface[] interfaces = getNetworkInterfaces(); + private static ExecutorService executor = null; + + private static synchronized NetworkInterface[] getInterfaces() { + if (null == interfaces) { + interfaces = getNetworkInterfaces(); + } + + return interfaces; + } + + private static synchronized ExecutorService getExecutorService() { + + if (null == executor) { + + int length = getInterfaces().length; + if (length < 1) { + length = 1; + } + + executor = Executors.newFixedThreadPool(length * 2); + } + + return executor; + } + + /** + * @param uri Connection URI + * @return Connection + * @throws IOException or error + * @throws IllegalArgumentException On undefined error + */ + @Override + public Connection getConnection(final URI uri) throws IOException { + + if (knownUris.size() >= LIMIT) { + //This is here just as a brake to prevent DOS or OOME. + //There is no way we should have more than this number of unique MutliPulse URI's in a LAN + throw new IllegalArgumentException("Unique MultiPulse URI limit of " + + LIMIT + + " reached. Increase using the system property '" + + ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT + + "'"); + } + + Set<URI> uriSet = knownUris.get(uri); + + if (null == uriSet || uriSet.isEmpty()) { + + final Map<String, String> params = getUriParameters(uri); + + final Set<String> schemes = getSet(params, "schemes", this.getDefaultSchemes()); + final String group = getString(params, "group", "default"); + final long timeout = getLong(params, "timeout", 250); + + try { + uriSet = MulticastPulseClient.discoverURIs(group, schemes, uri.getHost(), uri.getPort(), timeout); + } catch (final Exception e) { + throw new IllegalArgumentException("Unable to find an ejb server via the MultiPulse URI: " + uri); + } + + knownUris.put(uri, uriSet); + } + + for (final URI serviceURI : uriSet) { + + //Strip serverhost and group and try to connect + final URI tryUri = URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()); + + try { + return ConnectionManager.getConnection(tryUri); + } catch (final Exception e) { + + uriSet.remove(serviceURI); + + if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) { + //Notify server that this URI is not reachable + MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri), "group", "default"), tryUri, uri.getHost(), uri.getPort()); + } + + if (log.isLoggable(Level.FINE)) { + log.fine("Failed connection to: " + serviceURI); + } + } + } + + throw new IOException("Unable to connect an ejb server via the MultiPulse URI: " + uri); + } + + private static Map<String, String> getUriParameters(final URI uri) { + final Map<String, String> params; + try { + params = URIs.parseParamters(uri); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Invalid MultiPulse uri " + uri.toString(), e); + } + return params; + } + + /** + * Get a list of URIs discovered for the provided request. + * <p/> + * Returned URIs are of the format 'mp-{serverhost}:group:scheme://servicehost:port'. + * The serverhost is prefixed with 'mp-' in case the serverhost is an IP-Address, as RFC 2396 defines scheme must begin with a 'letter' + * + * @param forGroup Specific case sensitive group name or * for all + * @param schemes Acceptable scheme list + * @param host Multicast host address + * @param port Multicast port + * @param timeout Time to wait for a server response, at least 50ms + * @return A URI set, possibly empty + * @throws Exception On error + */ + public static Set<URI> discoverURIs(final String forGroup, final Set<String> schemes, final String host, final int port, long timeout) throws Exception { + + if (timeout < 50) { + timeout = 50; + } + + if (null == forGroup || forGroup.isEmpty()) { + throw new Exception("Specify a valid group or *"); + } + + if (null == schemes || schemes.isEmpty()) { + throw new Exception("Specify at least one scheme, 'ejbd' for example"); + } + + if (null == host || host.isEmpty()) { + throw new Exception("Specify a valid host name"); + } + + if (port < 1 || port > 65535) { + throw new Exception("Specify a valid port between 1 and 65535"); + } + + final InetAddress ia = getAddress(host); + + final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8); + final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port)); + + final AtomicBoolean running = new AtomicBoolean(true); + final List<Future> futures = Collections.synchronizedList(new ArrayList<Future>()); + + MulticastSocket[] clientSockets = null; + + try { + clientSockets = MulticastPulseClient.getSockets(ia, port); + final MulticastSocket[] clientSocketsFinal = clientSockets; + + final Timer timer = new Timer(true); + + final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() { + @Override + public int compare(final URI uri1, final URI uri2) { + + //Ignore server hostname + URI u1 = URI.create(uri1.getSchemeSpecificPart()); + URI u2 = URI.create(uri2.getSchemeSpecificPart()); + + //Ignore scheme (ejb,ejbs,etc.) + u1 = URI.create(u1.getSchemeSpecificPart()); + u2 = URI.create(u2.getSchemeSpecificPart()); + + //Compare URI hosts + int i = compare(u1.getHost(), u2.getHost()); + if (i != 0) { + i = uri1.compareTo(uri2); + } + + return i; + } + + private int compare(final String h1, final String h2) { + + //Sort by hostname, IPv4, IPv6 + + try { + if (IPAddressUtil.isIPv4LiteralAddress(h1)) { + if (IPAddressUtil.isIPv6LiteralAddress(h2.replace("[", "").replace("]", ""))) { + return -1; + } + } else if (IPAddressUtil.isIPv6LiteralAddress(h1.replace("[", "").replace("]", ""))) { + if (IPAddressUtil.isIPv4LiteralAddress(h2)) { + return 1; + } + } else if (0 != h1.compareTo(h2)) { + return -1; + } + } catch (final Exception e) { + //Ignore + } + + return h1.compareTo(h2); + } + }); + + final ReentrantLock setLock = new ReentrantLock(); + + //Start threads that listen for multicast packets on our channel. + //These need to start 'before' we pulse a request. + final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length); + + for (final MulticastSocket socket : clientSocketsFinal) { + + futures.add(getExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + final DatagramPacket response = new DatagramPacket(new byte[2048], 2048); + latchListeners.countDown(); + + while (running.get()) { + try { + + socket.receive(response); + + final SocketAddress sa = response.getSocketAddress(); + + if (null != sa && (sa instanceof InetSocketAddress)) { + + int len = response.getLength(); + if (len > 2048) { + + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len}); + } + len = 2048; + } + + String s = new String(response.getData(), 0, len); + + if (s.startsWith(MulticastPulseClient.SERVER)) { + + s = (s.replace(MulticastPulseClient.SERVER, "")); + final String group = s.substring(0, s.indexOf(':')); + s = s.substring(group.length() + 1); + + if (!"*".equals(forGroup) && !forGroup.equals(group)) { + continue; + } + + final String services = s.substring(0, s.lastIndexOf('|')); + s = s.substring(services.length() + 1); + + final String[] serviceList = services.split("\\|"); + final String[] hosts = s.split(","); + + for (final String svc : serviceList) { + + if (EMPTY.equals(svc)) { + continue; + } + + final URI serviceUri; + try { + serviceUri = URI.create(svc); + } catch (final Exception e) { + continue; + } + + if (schemes.contains(serviceUri.getScheme())) { + + //Just because multicast was received on this host is does not mean the service is on the same + //We can however use this to identify an individual machine and group + final String serverHost = ((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress(); + + final String serviceHost = serviceUri.getHost(); + if (MulticastPulseClient.isLocalAddress(serviceHost, false)) { + if (!MulticastPulseClient.isLocalAddress(serverHost, false)) { + //A local service is only available to a local client + continue; + } + } + + final String svcfull = ("mp-" + serverHost + ":" + group + ":" + svc); + + setLock.lock(); + + try { + if (svcfull.contains("0.0.0.0")) { + for (final String h : hosts) { + if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo + set.add(URI.create(svcfull.replace("0.0.0.0", ipFormat(h)))); + } + } + } else if (svcfull.contains("[::]")) { + for (final String h : hosts) { + if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo + set.add(URI.create(svcfull.replace("[::]", ipFormat(h)))); + } + } + } else { + //Just add as is + set.add(URI.create(svcfull)); + } + } catch (final Exception e) { + //Ignore + } finally { + setLock.unlock(); + } + } + } + } + } + + } catch (final Exception e) { + //Ignore + } + } + } finally { + try { + socket.leaveGroup(ia); + } catch (final Exception e) { + //Ignore + } + try { + socket.close(); + } catch (final Exception e) { + //Ignore + } + } + } + })); + } + + try { + //Give listener threads a reasonable amount of time to start + if (latchListeners.await(clientSocketsFinal.length * 2, TimeUnit.SECONDS)) { + + //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout + //This pulse is designed to tell a listening server to wake up and pulse back a response + futures.add(0, getExecutorService().submit(new Runnable() { + @Override + public void run() { + while (running.get()) { + //Pulse to listening servers - It is thread safe to use same sockets as send/receive synchronization is only on the packet + for (final MulticastSocket socket : clientSocketsFinal) { + + if (running.get()) { + try { + socket.send(request); + } catch (final Exception e) { + //Ignore + } + } else { + break; + } + } + + if (running.get()) { + try { + Thread.sleep(10); + } catch (final InterruptedException e) { + break; + } + } + } + } + })); + } else { + timeout = 1; + } + + } catch (final InterruptedException e) { + //Terminate as quickly as possible + timeout = 1; + } + + //Kill the threads after timeout + timer.schedule(new TimerTask() { + @Override + public void run() { + + running.set(false); + + try { + for (final Future future : futures) { + future.cancel(true); + } + } catch (final ConcurrentModificationException e) { + //Ignore + } + + } + }, timeout); + + //Wait for threads to complete + for (final Future future : futures) { + try { + future.get(); + } catch (final Exception e) { + //Ignore + } + } + + setLock.lock(); + try { + return new TreeSet<URI>(set); + } finally { + setLock.unlock(); + } + } finally { + + //Just to be sure we are clean + for (final Future future : futures) { + try { + future.cancel(true); + } catch (final Exception e) { + //Ignore + } + } + + futures.clear(); + + for (final MulticastSocket socket : clientSockets) { + + try { + socket.leaveGroup(ia); + } catch (final Exception e) { + //Ignore + } + try { + socket.close(); + } catch (final Exception e) { + //Ignore + } + } + } + } + + private static InetAddress getAddress(final String host) throws Exception { + final InetAddress ia; + try { + ia = InetAddress.getByName(host); + } catch (final UnknownHostException e) { + throw new Exception(host + " is not a valid address", e); + } + + if (null == ia || !ia.isMulticastAddress()) { + throw new Exception(host + " is not a valid multicast address"); + } + return ia; + } + + /** + * Is the provided host a local host + * + * @param host The host to test + * @param wildcardIsLocal Should 0.0.0.0 or [::] be deemed as local + * @return True is the host is a local host else false + */ + public static boolean isLocalAddress(final String host, final boolean wildcardIsLocal) { + + final InetAddress addr; + try { + addr = InetAddress.getByName(host); + } catch (final UnknownHostException e) { + return false; + } + + // Check if the address is a valid special local or loop back + if ((wildcardIsLocal && addr.isAnyLocalAddress()) || addr.isLoopbackAddress()) { + return true; + } + + // Check if the address is defined on any local interface + try { + return NetworkInterface.getByInetAddress(addr) != null; + } catch (final SocketException e) { + return false; + } + } + + private static String ipFormat(final String h) throws UnknownHostException { + + final InetAddress ia = InetAddress.getByName(h); + if (ia instanceof Inet6Address) { + return "[" + ia.getHostAddress() + "]"; + } else { + return h; + } + } + + public static MulticastSocket[] getSockets(final InetAddress ia, final int port) throws Exception { + + final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>(); + + for (final NetworkInterface ni : getInterfaces()) { + + MulticastSocket ms = null; + + try { + + ms = new MulticastSocket(port); + ms.setNetworkInterface(ni); + ms.setSoTimeout(0); + ms.setTimeToLive(TTL); + if (!ms.getBroadcast()) { + ms.setBroadcast(true); + } + ms.joinGroup(ia); + + list.add(ms); + + } catch (final Exception e) { + + if (null != ms) { + try { + ms.close(); + } catch (final Exception t) { + //Ignore + } + } + + } + } + + return list.toArray(new MulticastSocket[list.size()]); + } + + private static NetworkInterface[] getNetworkInterfaces() { + + final HashSet<NetworkInterface> list = new HashSet<NetworkInterface>(); + + try { + final Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + final NetworkInterface next = interfaces.nextElement(); + + if (next.supportsMulticast() && next.isUp()) { + list.add(next); + } + } + } catch (final SocketException e) { + //Ignore + } + + return list.toArray(new NetworkInterface[list.size()]); + } + + private static final CommandParser cmd = new CommandParser() { + @Override + protected void init() { + category("Options"); + + opt('g', "group").type(String.class).value("*").description("Group name"); + + opt('h', "host").type(String.class).value("239.255.3.2").description("Multicast address"); + + opt('p', "port").type(int.class).value(6142).description("Multicast port"); + + opt('t', "timeout").type(int.class).value(1000).description("Pulse back timeout"); + } + + @Override + protected List<String> validate(final Arguments arguments) { + return super.validate(arguments); + } + + @Override + protected List<String> usage() { + return super.usage(); + } + }; + + @SuppressWarnings("UseOfSystemOutOrSystemErr") + public static void main(final String[] args) throws Exception { + + final CommandParser.Arguments arguments; + try { + arguments = cmd.parse(args); + } catch (final CommandParser.HelpException e) { + System.exit(0); + throw new Exception(); // never reached, but keeps compiler happy + } catch (final CommandParser.InvalidOptionsException e) { + System.exit(1); + throw new Exception(); // never reached, but keeps compiler happy + } + + final Options options = arguments.options(); + + final String discover = options.get("group", "*"); + final String mchost = options.get("host", "239.255.3.2"); + final int mcport = options.get("port", 6142); + final int timeout = options.get("timeout", 1500); + + System.out.println(String.format("Using discovery options group=%1$s, host=%2$s, port=%3$s, timeout=%4$s", discover, mchost, mcport, timeout)); + System.out.println(); + + final AtomicBoolean running = new AtomicBoolean(true); + + final Thread t = new Thread(new Runnable() { + @SuppressWarnings("UseOfSystemOutOrSystemErr") + @Override + public void run() { + while (running.get()) { + + Set<URI> uriSet = null; + try { + uriSet = MulticastPulseClient.discoverURIs(discover, new HashSet<String>(Arrays.asList("ejbd", "ejbds", "http", "https")), mchost, mcport, timeout); + } catch (final Exception e) { + System.err.println(e.getMessage()); + } + + final int size = uriSet.size(); + if (uriSet != null && size > 0) { + + final int st = (timeout / size); + + for (final URI uri : uriSet) { + + final String server = uri.getScheme().replace("mp-", ""); + URI uriSub = URI.create(uri.getSchemeSpecificPart()); + + final String group = uriSub.getScheme(); + uriSub = URI.create(uriSub.getSchemeSpecificPart()); + + final String host = uriSub.getHost(); + final int port = uriSub.getPort(); + + if (MulticastPulseClient.isLocalAddress(host, false) && !MulticastPulseClient.isLocalAddress(server, false)) { + System.out.println(server + ":" + group + " - " + uriSub.toASCIIString() + " is not a local service"); + continue; + } + + System.out.print(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: "); + + boolean b = false; + final Socket s = new Socket(); + try { + s.connect(new InetSocketAddress(host, port), st); + b = true; + } catch (final Exception e) { + if (java.net.SocketTimeoutException.class.isInstance(e) || SocketException.class.isInstance(e)) { + MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport); + System.out.print("" + e + " : "); + } + } finally { + try { + s.close(); + } catch (final Exception e) { + //Ignore + } + } + + System.out.println(b); + } + } else { + System.out.println("### Failed to discover server: " + discover); + } + + System.out.println("."); + + try { + Thread.sleep(500); + } catch (final InterruptedException e) { + //Ignore + } + } + } + }, "MulticastPulseClient Test"); + + t.setDaemon(true); + t.start(); + + //noinspection ResultOfMethodCallIgnored + System.in.read(); + + running.set(false); + t.interrupt(); + } + + /** + * Asynchronous attempt to broadcast a bad URI on our channel. + * Hopefully the culprit server will hear this and stop sending it. + * + * @param uri Bad URI to broadcast + */ + private static void broadcastBadUri(final String group, final URI uri, final String host, final int port) { + + getExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + final InetAddress ia = getAddress(host); + + final byte[] bytes = (MulticastPulseClient.CLIENT + group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8); + final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port)); + + final MulticastSocket[] multicastSockets = MulticastPulseClient.getSockets(ia, port); + + for (final MulticastSocket socket : multicastSockets) { + + try { + socket.send(request); + } catch (final Exception e) { + log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e); + } + } + } catch (final Exception e) { + log.log(Level.WARNING, "Failed to broadcast bad URI: " + uri, e); + } + } + }); + } +}
http://git-wip-us.apache.org/repos/asf/tomee/blob/bd7dbd0f/server/openejb-client/src/main/java/org/apache/openejb/client/ProtocolMetaData.java ---------------------------------------------------------------------- diff --git a/server/openejb-client/src/main/java/org/apache/openejb/client/ProtocolMetaData.java b/server/openejb-client/src/main/java/org/apache/openejb/client/ProtocolMetaData.java index b19252b..95a120f 100644 --- a/server/openejb-client/src/main/java/org/apache/openejb/client/ProtocolMetaData.java +++ b/server/openejb-client/src/main/java/org/apache/openejb/client/ProtocolMetaData.java @@ -1,112 +1,112 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openejb.client; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; - -/** - * OpenEJB Enterprise Javabean Protocol (OEJP) - * <p/> - * OEJP uses a "<major>.<minor>" numbering scheme to indicate versions of the protocol. - * <p/> - * Protocol-Version = "OEJP" "/" 1*DIGIT "." 1*DIGIT - * <p/> - * Some compatability is guaranteed with the major part of the version number. - * - * @version $Revision$ $Date$ - */ -@SuppressWarnings("UnusedDeclaration") -public class ProtocolMetaData { - - public static final String VERSION = "4.7"; - - private static final String OEJB = "OEJP"; - private transient String id; - private transient int major; - private transient int minor; - - public ProtocolMetaData() { - init(OEJB + "/" + VERSION); - } - - public ProtocolMetaData(final String version) { - init(OEJB + "/" + version); - } - - private void init(final String spec) { - - if (!spec.matches("^OEJP/[0-9]\\.[0-9]$")) { - throw new RuntimeException("Protocol version spec must follow format [ \"OEJB\" \"/\" 1*DIGIT \".\" 1*DIGIT ] - " + spec); - } - - final char[] chars = new char[8]; - spec.getChars(0, chars.length, chars, 0); - - this.id = new String(chars, 0, 4); - this.major = Integer.parseInt(new String(chars, 5, 1)); - this.minor = Integer.parseInt(new String(chars, 7, 1)); - } - - public boolean isAtLeast(final int major, final int minor) { - return this.major >= major && (this.major != major || this.minor >= minor); - } - - public String getId() { - return id; - } - - public int getMajor() { - return major; - } - - public int getMinor() { - return minor; - } - - public String getVersion() { - return major + "." + minor; - } - - public String getSpec() { - return id + "/" + major + "." + minor; - } - - public void writeExternal(final OutputStream out) throws IOException { - out.write(getSpec().getBytes("UTF-8")); - out.flush(); - } - - public void readExternal(final InputStream in) throws IOException { - final byte[] spec = new byte[8]; - for (int i = 0; i < spec.length; i++) { - spec[i] = (byte) in.read(); - if (spec[i] == -1) { - throw new EOFException("Unable to read protocol version. Reached the end of the stream."); - } - } - try { - init(new String(spec, "UTF-8")); - } catch (final Throwable e) { - throw new IOException("Failed to read spec: " + Arrays.toString(spec), e); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openejb.client; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; + +/** + * OpenEJB Enterprise Javabean Protocol (OEJP) + * <p/> + * OEJP uses a "<major>.<minor>" numbering scheme to indicate versions of the protocol. + * <p/> + * Protocol-Version = "OEJP" "/" 1*DIGIT "." 1*DIGIT + * <p/> + * Some compatability is guaranteed with the major part of the version number. + * + * @version $Revision$ $Date$ + */ +@SuppressWarnings("UnusedDeclaration") +public class ProtocolMetaData { + + public static final String VERSION = "4.7"; + + private static final String OEJB = "OEJP"; + private transient String id; + private transient int major; + private transient int minor; + + public ProtocolMetaData() { + init(OEJB + "/" + VERSION); + } + + public ProtocolMetaData(final String version) { + init(OEJB + "/" + version); + } + + private void init(final String spec) { + + if (!spec.matches("^OEJP/[0-9]\\.[0-9]$")) { + throw new RuntimeException("Protocol version spec must follow format [ \"OEJB\" \"/\" 1*DIGIT \".\" 1*DIGIT ] - " + spec); + } + + final char[] chars = new char[8]; + spec.getChars(0, chars.length, chars, 0); + + this.id = new String(chars, 0, 4); + this.major = Integer.parseInt(new String(chars, 5, 1)); + this.minor = Integer.parseInt(new String(chars, 7, 1)); + } + + public boolean isAtLeast(final int major, final int minor) { + return this.major >= major && (this.major != major || this.minor >= minor); + } + + public String getId() { + return id; + } + + public int getMajor() { + return major; + } + + public int getMinor() { + return minor; + } + + public String getVersion() { + return major + "." + minor; + } + + public String getSpec() { + return id + "/" + major + "." + minor; + } + + public void writeExternal(final OutputStream out) throws IOException { + out.write(getSpec().getBytes("UTF-8")); + out.flush(); + } + + public void readExternal(final InputStream in) throws IOException { + final byte[] spec = new byte[8]; + for (int i = 0; i < spec.length; i++) { + spec[i] = (byte) in.read(); + if (spec[i] == -1) { + throw new EOFException("Unable to read protocol version. Reached the end of the stream."); + } + } + try { + init(new String(spec, "UTF-8")); + } catch (final Throwable e) { + throw new IOException("Failed to read spec: " + Arrays.toString(spec), e); + } + } +} http://git-wip-us.apache.org/repos/asf/tomee/blob/bd7dbd0f/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/AutoJAXRSInvoker.java ---------------------------------------------------------------------- diff --git a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/AutoJAXRSInvoker.java b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/AutoJAXRSInvoker.java index 012cb05..579f7eb 100644 --- a/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/AutoJAXRSInvoker.java +++ b/server/openejb-cxf-rs/src/main/java/org/apache/openejb/server/cxf/rs/AutoJAXRSInvoker.java @@ -1,70 +1,70 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.openejb.server.cxf.rs; - -import org.apache.cxf.jaxrs.JAXRSInvoker; -import org.apache.cxf.jaxrs.model.ClassResourceInfo; -import org.apache.cxf.message.Exchange; -import org.apache.cxf.service.invoker.Invoker; -import org.apache.openejb.BeanContext; -import org.apache.openejb.BeanType; -import org.apache.openejb.server.rest.EJBRestServiceInfo; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -public class AutoJAXRSInvoker implements Invoker { - private final Map<String, EJBRestServiceInfo> ejbs; - private final OpenEJBEJBInvoker ejbInvoker; - private final JAXRSInvoker jaxrsInvoker; - - public AutoJAXRSInvoker(final Map<String, EJBRestServiceInfo> restEjbs) { - ejbs = restEjbs; - - // delegates - jaxrsInvoker = new PojoInvoker(); - if (!ejbs.isEmpty()) { - ejbInvoker = new OpenEJBEJBInvoker(beanContexts(restEjbs)); - } else { - ejbInvoker = null; // no need - } - } - - private static Collection<BeanContext> beanContexts(final Map<String, EJBRestServiceInfo> restEjbs) { - final Collection<BeanContext> bc = new ArrayList<BeanContext>(); - for (final EJBRestServiceInfo i : restEjbs.values()) { - bc.add(i.context); - } - return bc; - } - - @Override - public Object invoke(final Exchange exchange, final Object o) { // mainly a select the right invoker impl - final ClassResourceInfo cri = (ClassResourceInfo) exchange.get("root.resource.class"); - - if (cri != null) { - final String clazz = cri.getServiceClass().getName(); - final EJBRestServiceInfo restServiceInfo = ejbs.get(clazz); - if (restServiceInfo != null && !BeanType.MANAGED.equals(restServiceInfo.context.getComponentType())) { - return ejbInvoker.invoke(exchange, o); - } - } - - return jaxrsInvoker.invoke(exchange, o); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.server.cxf.rs; + +import org.apache.cxf.jaxrs.JAXRSInvoker; +import org.apache.cxf.jaxrs.model.ClassResourceInfo; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.service.invoker.Invoker; +import org.apache.openejb.BeanContext; +import org.apache.openejb.BeanType; +import org.apache.openejb.server.rest.EJBRestServiceInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +public class AutoJAXRSInvoker implements Invoker { + private final Map<String, EJBRestServiceInfo> ejbs; + private final OpenEJBEJBInvoker ejbInvoker; + private final JAXRSInvoker jaxrsInvoker; + + public AutoJAXRSInvoker(final Map<String, EJBRestServiceInfo> restEjbs) { + ejbs = restEjbs; + + // delegates + jaxrsInvoker = new PojoInvoker(); + if (!ejbs.isEmpty()) { + ejbInvoker = new OpenEJBEJBInvoker(beanContexts(restEjbs)); + } else { + ejbInvoker = null; // no need + } + } + + private static Collection<BeanContext> beanContexts(final Map<String, EJBRestServiceInfo> restEjbs) { + final Collection<BeanContext> bc = new ArrayList<BeanContext>(); + for (final EJBRestServiceInfo i : restEjbs.values()) { + bc.add(i.context); + } + return bc; + } + + @Override + public Object invoke(final Exchange exchange, final Object o) { // mainly a select the right invoker impl + final ClassResourceInfo cri = (ClassResourceInfo) exchange.get("root.resource.class"); + + if (cri != null) { + final String clazz = cri.getServiceClass().getName(); + final EJBRestServiceInfo restServiceInfo = ejbs.get(clazz); + if (restServiceInfo != null && !BeanType.MANAGED.equals(restServiceInfo.context.getComponentType())) { + return ejbInvoker.invoke(exchange, o); + } + } + + return jaxrsInvoker.invoke(exchange, o); + } +}
