http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-commons/src/main/java/org/apache/activemq6/utils/UUIDTimer.java ---------------------------------------------------------------------- diff --git a/activemq6-commons/src/main/java/org/apache/activemq6/utils/UUIDTimer.java b/activemq6-commons/src/main/java/org/apache/activemq6/utils/UUIDTimer.java new file mode 100644 index 0000000..60b41d7 --- /dev/null +++ b/activemq6-commons/src/main/java/org/apache/activemq6/utils/UUIDTimer.java @@ -0,0 +1,324 @@ +/* JUG Java Uuid Generator + * + * Copyright (c) 2002- Tatu Saloranta, [email protected] + * + * Licensed under the License specified in the file licenses/LICENSE.txt which is + * included with the source code. + * You may not use this file except in compliance with the License. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq6.utils; + +import java.util.Random; + +/** + * UUIDTimer produces the time stamps required for time-based UUIDs. It works as + * outlined in the UUID specification, with following implementation: + * <ul> + * <li>Java classes can only product time stamps with maximum resolution of one + * millisecond (at least before JDK 1.5). To compensate, an additional counter + * is used, so that more than one UUID can be generated between java clock + * updates. Counter may be used to generate up to 10000 UUIDs for each distrinct + * java clock value. + * <li>Due to even lower clock resolution on some platforms (older Windows + * versions use 55 msec resolution), timestamp value can also advanced ahead of + * physical value within limits (by default, up 100 millisecond ahead of + * reported), iff necessary (ie. 10000 instances created before clock time + * advances). + * <li>As an additional precaution, counter is initialized not to 0 but to a + * random 8-bit number, and each time clock changes, lowest 8-bits of counter + * are preserved. The purpose it to make likelihood of multi-JVM multi-instance + * generators to collide, without significantly reducing max. UUID generation + * speed. Note though that using more than one generator (from separate JVMs) is + * strongly discouraged, so hopefully this enhancement isn't needed. This 8-bit + * offset has to be reduced from total max. UUID count to preserve ordering + * property of UUIDs (ie. one can see which UUID was generated first for given + * UUID generator); the resulting 9500 UUIDs isn't much different from the + * optimal choice. + * <li>Finally, as of version 2.0 and onwards, optional external timestamp + * synchronization can be done. This is done similar to the way UUID + * specification suggests; except that since there is no way to lock the whole + * system, file-based locking is used. This works between multiple JVMs and Jug + * instances. + * </ul> + * <p> + * Some additional assumptions about calculating the timestamp: + * <ul> + * <li>System.currentTimeMillis() is assumed to give time offset in UTC, or at + * least close enough thing to get correct timestamps. The alternate route would + * have to go through calendar object, use TimeZone offset to get to UTC, and + * then modify. Using currentTimeMillis should be much faster to allow rapid + * UUID creation. + * <li>Similarly, the constant used for time offset between 1.1.1970 and start + * of Gregorian calendar is assumed to be correct (which seems to be the case + * when testing with Java calendars). + * </ul> + * <p> + * Note about synchronization: this class is assumed to always be called from a + * synchronized context (caller locks on either this object, or a similar timer + * lock), and so has no method synchronization. + */ +public class UUIDTimer +{ + // // // Constants + + /** + * Since System.longTimeMillis() returns time from january 1st 1970, and + * UUIDs need time from the beginning of gregorian calendar (15-oct-1582), + * need to apply the offset: + */ + private static final long kClockOffset = 0x01b21dd213814000L; + + /** + * Also, instead of getting time in units of 100nsecs, we get something with + * max resolution of 1 msec... and need the multiplier as well + */ + private static final long kClockMultiplier = 10000; + + private static final long kClockMultiplierL = 10000L; + + /** + * Let's allow "virtual" system time to advance at most 100 milliseconds + * beyond actual physical system time, before adding delays. + */ + private static final long kMaxClockAdvance = 100L; + + // // // Configuration + + private final Random mRnd; + + // // // Clock state: + + /** + * Additional state information used to protect against anomalous cases + * (clock time going backwards, node id getting mixed up). Third byte is + * actually used for seeding counter on counter overflow. + */ + private final byte[] mClockSequence = new byte[3]; + + /** + * Last physical timestamp value <code>System.currentTimeMillis()</code> + * returned: used to catch (and report) cases where system clock goes + * backwards. Is also used to limit "drifting", that is, amount timestamps + * used can differ from the system time value. This value is not guaranteed + * to be monotonically increasing. + */ + private long mLastSystemTimestamp = 0L; + + /** + * Timestamp value last used for generating a UUID (along with + * {@link #mClockCounter}. Usually the same as {@link #mLastSystemTimestamp}, + * but not always (system clock moved backwards). Note that this value is + * guaranteed to be monotonically increasing; that is, at given absolute time + * points t1 and t2 (where t2 is after t1), t1 <= t2 will always hold true. + */ + private long mLastUsedTimestamp = 0L; + + /** + * Counter used to compensate inadequate resolution of JDK system timer. + */ + private int mClockCounter = 0; + + UUIDTimer(final Random rnd) + { + mRnd = rnd; + initCounters(rnd); + mLastSystemTimestamp = 0L; + // This may get overwritten by the synchronizer + mLastUsedTimestamp = 0L; + } + + private void initCounters(final Random rnd) + { + /* + * Let's generate the clock sequence field now; as with counter, this + * reduces likelihood of collisions (as explained in UUID specs) + */ + rnd.nextBytes(mClockSequence); + /* + * Ok, let's also initialize the counter... Counter is used to make it + * slightly less likely that two instances of UUIDGenerator (from separate + * JVMs as no more than one can be created in one JVM) would produce + * colliding time-based UUIDs. The practice of using multiple generators, + * is strongly discouraged, of course, but just in case... + */ + mClockCounter = mClockSequence[2] & 0xFF; + } + + public void getTimestamp(final byte[] uuidData) + { + // First the clock sequence: + uuidData[UUID.INDEX_CLOCK_SEQUENCE] = mClockSequence[0]; + uuidData[UUID.INDEX_CLOCK_SEQUENCE + 1] = mClockSequence[1]; + + long systime = System.currentTimeMillis(); + + /* + * Let's first verify that the system time is not going backwards; + * independent of whether we can use it: + */ + if (systime < mLastSystemTimestamp) + { + // Logger.logWarning("System time going backwards! (got value + // "+systime+", last "+mLastSystemTimestamp); + // Let's write it down, still + mLastSystemTimestamp = systime; + } + + /* + * But even without it going backwards, it may be less than the last one + * used (when generating UUIDs fast with coarse clock resolution; or if + * clock has gone backwards over reboot etc). + */ + if (systime <= mLastUsedTimestamp) + { + /* + * Can we just use the last time stamp (ok if the counter hasn't hit + * max yet) + */ + if (mClockCounter < UUIDTimer.kClockMultiplier) + { // yup, still have room + systime = mLastUsedTimestamp; + } + else + { // nope, have to roll over to next value and maybe wait + long actDiff = mLastUsedTimestamp - systime; + long origTime = systime; + systime = mLastUsedTimestamp + 1L; + + // Logger.logWarning("Timestamp over-run: need to reinitialize + // random sequence"); + + /* + * Clock counter is now at exactly the multiplier; no use just + * anding its value. So, we better get some random numbers + * instead... + */ + initCounters(mRnd); + + /* + * But do we also need to slow down? (to try to keep virtual time + * close to physical time; ie. either catch up when system clock has + * been moved backwards, or when coarse clock resolution has forced + * us to advance virtual timer too far) + */ + if (actDiff >= UUIDTimer.kMaxClockAdvance) + { + UUIDTimer.slowDown(origTime, actDiff); + } + } + } + else + { + /* + * Clock has advanced normally; just need to make sure counter is reset + * to a low value (need not be 0; good to leave a small residual to + * further decrease collisions) + */ + mClockCounter &= 0xFF; + } + + mLastUsedTimestamp = systime; + + /* + * Now, let's translate the timestamp to one UUID needs, 100ns unit offset + * from the beginning of Gregorian calendar... + */ + systime *= UUIDTimer.kClockMultiplierL; + systime += UUIDTimer.kClockOffset; + + // Plus add the clock counter: + systime += mClockCounter; + // and then increase + ++mClockCounter; + + /* + * Time fields are nicely split across the UUID, so can't just linearly + * dump the stamp: + */ + int clockHi = (int) (systime >>> 32); + int clockLo = (int) systime; + + uuidData[UUID.INDEX_CLOCK_HI] = (byte) (clockHi >>> 24); + uuidData[UUID.INDEX_CLOCK_HI + 1] = (byte) (clockHi >>> 16); + uuidData[UUID.INDEX_CLOCK_MID] = (byte) (clockHi >>> 8); + uuidData[UUID.INDEX_CLOCK_MID + 1] = (byte) clockHi; + + uuidData[UUID.INDEX_CLOCK_LO] = (byte) (clockLo >>> 24); + uuidData[UUID.INDEX_CLOCK_LO + 1] = (byte) (clockLo >>> 16); + uuidData[UUID.INDEX_CLOCK_LO + 2] = (byte) (clockLo >>> 8); + uuidData[UUID.INDEX_CLOCK_LO + 3] = (byte) clockLo; + } + + /* + * /////////////////////////////////////////////////////////// // Private + * methods /////////////////////////////////////////////////////////// + */ + + private static final int MAX_WAIT_COUNT = 50; + + /** + * Simple utility method to use to wait for couple of milliseconds, to let + * system clock hopefully advance closer to the virtual timestamps used. + * Delay is kept to just a millisecond or two, to prevent excessive blocking; + * but that should be enough to eventually synchronize physical clock with + * virtual clock values used for UUIDs. + * + */ + private static void slowDown(final long startTime, final long actDiff) + { + /* + * First, let's determine how long we'd like to wait. This is based on how + * far ahead are we as of now. + */ + long ratio = actDiff / UUIDTimer.kMaxClockAdvance; + long delay; + + if (ratio < 2L) + { // 200 msecs or less + delay = 1L; + } + else if (ratio < 10L) + { // 1 second or less + delay = 2L; + } + else if (ratio < 600L) + { // 1 minute or less + delay = 3L; + } + else + { + delay = 5L; + } + // Logger.logWarning("Need to wait for "+delay+" milliseconds; virtual + // clock advanced too far in the future"); + long waitUntil = startTime + delay; + int counter = 0; + do + { + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + } + delay = 1L; + /* + * This is just a sanity check: don't want an "infinite" loop if clock + * happened to be moved backwards by, say, an hour... + */ + if (++counter > UUIDTimer.MAX_WAIT_COUNT) + { + break; + } + } + while (System.currentTimeMillis() < waitUntil); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-commons/src/test/java/org/apache/activemq6/utils/ByteUtilTest.java ---------------------------------------------------------------------- diff --git a/activemq6-commons/src/test/java/org/apache/activemq6/utils/ByteUtilTest.java b/activemq6-commons/src/test/java/org/apache/activemq6/utils/ByteUtilTest.java new file mode 100644 index 0000000..ccec25e --- /dev/null +++ b/activemq6-commons/src/test/java/org/apache/activemq6/utils/ByteUtilTest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * @author Clebert Suconic + */ + +public class ByteUtilTest +{ + @Test + public void testBytesToString() + { + byte[] byteArray = new byte[] {0, 1, 2, 3}; + + testEquals("0001 0203", ByteUtil.bytesToHex(byteArray, 2)); + testEquals("00 01 02 03", ByteUtil.bytesToHex(byteArray, 1)); + testEquals("000102 03", ByteUtil.bytesToHex(byteArray, 3)); + } + + + @Test + public void testMaxString() + { + byte[] byteArray = new byte[20 * 1024]; + System.out.println(ByteUtil.maxString(ByteUtil.bytesToHex(byteArray, 2),150)); + } + + + void testEquals(String string1, String string2) + { + if (!string1.equals(string2)) + { + Assert.fail("String are not the same:=" + string1 + "!=" + string2); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-commons/src/test/java/org/apache/activemq6/utils/PairTest.java ---------------------------------------------------------------------- diff --git a/activemq6-commons/src/test/java/org/apache/activemq6/utils/PairTest.java b/activemq6-commons/src/test/java/org/apache/activemq6/utils/PairTest.java new file mode 100644 index 0000000..15df799 --- /dev/null +++ b/activemq6-commons/src/test/java/org/apache/activemq6/utils/PairTest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import org.junit.Test; + +import org.junit.Assert; +import org.apache.activemq6.api.core.Pair; + + +public class PairTest extends Assert +{ + + @Test + public void testPair() + { + Pair<Integer, Integer> p = new Pair<Integer, Integer>(Integer.valueOf(12), Integer.valueOf(13)); + int hash = p.hashCode(); + p.setA(null); + assertTrue(hash != p.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-commons/src/test/java/org/apache/activemq6/utils/ReferenceCounterTest.java ---------------------------------------------------------------------- diff --git a/activemq6-commons/src/test/java/org/apache/activemq6/utils/ReferenceCounterTest.java b/activemq6-commons/src/test/java/org/apache/activemq6/utils/ReferenceCounterTest.java new file mode 100644 index 0000000..860b983 --- /dev/null +++ b/activemq6-commons/src/test/java/org/apache/activemq6/utils/ReferenceCounterTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +/** + * @author Clebert Suconic + */ + +public class ReferenceCounterTest extends Assert +{ + + class LatchRunner implements Runnable + { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger counts = new AtomicInteger(0); + volatile Thread lastThreadUsed; + + public void run() + { + counts.incrementAndGet(); + latch.countDown(); + } + } + + @Test + public void testReferenceNoExecutor() throws Exception + { + internalTestReferenceNoExecutor(null); + } + + @Test + public void testReferenceWithExecutor() throws Exception + { + ExecutorService executor = Executors.newSingleThreadExecutor(); + internalTestReferenceNoExecutor(executor); + executor.shutdown(); + } + + @Test + public void testReferenceValidExecutorUsed() throws Exception + { + ExecutorService executor = Executors.newSingleThreadExecutor(); + LatchRunner runner = new LatchRunner(); + ReferenceCounterUtil counter = new ReferenceCounterUtil(runner, executor); + counter.increment(); + counter.decrement(); + + runner.latch.await(5, TimeUnit.SECONDS); + + assertNotSame(runner.lastThreadUsed, Thread.currentThread()); + + executor.shutdown(); + } + + public void internalTestReferenceNoExecutor(Executor executor) throws Exception + { + LatchRunner runner = new LatchRunner(); + + final ReferenceCounterUtil ref; + + if (executor == null) + { + ref = new ReferenceCounterUtil(runner); + } + else + { + ref = new ReferenceCounterUtil(runner, executor); + } + + Thread[] t = new Thread[100]; + + for (int i = 0; i < t.length; i++) + { + t[i] = new Thread() + { + public void run() + { + ref.increment(); + } + }; + t[i].start(); + } + + for (Thread tx : t) + { + tx.join(); + } + + for (int i = 0; i < t.length; i++) + { + t[i] = new Thread() + { + public void run() + { + ref.decrement(); + } + }; + t[i].start(); + } + + for (Thread tx : t) + { + tx.join(); + } + + assertTrue(runner.latch.await(5, TimeUnit.SECONDS)); + + assertEquals(1, runner.counts.get()); + + + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/pom.xml ---------------------------------------------------------------------- diff --git a/activemq6-core-client/pom.xml b/activemq6-core-client/pom.xml new file mode 100644 index 0000000..96034d1 --- /dev/null +++ b/activemq6-core-client/pom.xml @@ -0,0 +1,176 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-pom</artifactId> + <version>6.0.0-SNAPSHOT</version> + </parent> + + <artifactId>activemq6-core-client</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ6 Core Client</name> + + <properties> + <hornetq.basedir>${project.basedir}/..</hornetq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + </dependency> + <dependency> + <groupId>org.jgroups</groupId> + <artifactId>jgroups</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-commons</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-selector</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-journal</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>release</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9</version> + <configuration> + <doclet>org.jboss.apiviz.APIviz</doclet> + <docletArtifact> + <groupId>org.jboss.apiviz</groupId> + <artifactId>apiviz</artifactId> + <version>1.3.2.GA</version> + </docletArtifact> + <useStandardDocletOptions>true</useStandardDocletOptions> + <minmemory>128m</minmemory> + <maxmemory>512m</maxmemory> + <quiet>false</quiet> + <aggregate>true</aggregate> + <excludePackageNames>org.hornetq.core:org.hornetq.utils</excludePackageNames> + </configuration> + <executions> + <execution> + <id>javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <!-- Adds a directory to the list of source folders to be used in this project. This is + here to let Maven know about: + target/generated-sources/xslt/org/hornetq/api/config/HornetQDefaultConfiguration.java + which is generated by ./src/main/resources/hqDefaults.xsl using the HornetQ + configuration schema. --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>./target/generated-sources/xslt</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformerFactory>net.sf.saxon.TransformerFactoryImpl</transformerFactory> + <transformationSets> + <transformationSet> + <!-- Generates target/generated-sources/xslt/.../HornetQDefaultConfiguration.java + using this XSLT `./src/main/resources/hqDefaults.xsl`. --> + <dir>../activemq6-server/src/main/resources/schema</dir> + <outputDir>${project.build.directory}/generated-sources/xslt</outputDir> + <stylesheet>./src/main/resources/hqDefaults.xsl</stylesheet> + <includes> + <include>hornetq-configuration.xsd</include> + </includes> + <fileMappers> + <fileMapper implementation="org.codehaus.plexus.components.io.filemappers.MergeFileMapper"> + <targetName>./org/apache/activemq6/api/config/HornetQDefaultConfiguration.java</targetName> + </fileMapper> + </fileMappers> + </transformationSet> + </transformationSets> + </configuration> + <dependencies> + <dependency> + <groupId>net.sf.saxon</groupId> + <artifactId>saxon</artifactId> + <version>8.7</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <phase>test</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpoint.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpoint.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpoint.java new file mode 100644 index 0000000..b34e3f2 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpoint.java @@ -0,0 +1,95 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.util.concurrent.TimeUnit; + +/** + * BroadcastEndpint is used in BroadcastGroups and DiscoveryGroups for topology updates. + * <p> + * A BroadcastEndpoint can perform one of the two following tasks: + * <ul> + * <li>when being used in BroadcastGroups, it broadcasts connector informations</li> + * <li>when being used in DiscoveryGroups, it receives broadcasts</li> + * </ul> + * <p> + * The two tasks are mutual exclusive, meaning a BroadcastEndpoint can either be a broadcaster + * or a receiver, but not both. + * <p> + * It is an abstraction of various concrete broadcasting mechanisms. Different implementations + * of this interface may use different broadcasting techniques like UDP multicasting or + * JGroups channels. + * + * @author Tomohisa + * @author Howard Gao + * @see JGroupsBroadcastEndpoint + * @see UDPBroadcastEndpoint + */ +public interface BroadcastEndpoint +{ + /** + * This method initializes a BroadcastEndpoint as + * a receiving end for broadcasts. After that data can be + * received using one of its receiveBroadcast() methods. + * + * @throws Exception + */ + void openClient() throws Exception; + + /** + * This method initializes a BroadcastEndpint as + * a broadcaster. After that data can be sent + * via its broadcast() method. + * + * @throws Exception + */ + void openBroadcaster() throws Exception; + + /** + * Close the endpoint. Any related resources should + * be cleaned up in this method. + * + * @param isBroadcast : indicates whether this endpoint serves as a broadcast or not. + * @throws Exception + */ + void close(boolean isBroadcast) throws Exception; + + /** + * Broadcasting data to the cluster. + * + * @param data : a byte array containing the data. + * @throws Exception + */ + void broadcast(byte[] data) throws Exception; + + /** + * Receives the broadcast data. It blocks until data is + * available. + * + * @return the received data as byte array. + * @throws Exception + */ + byte[] receiveBroadcast() throws Exception; + + /** + * Receives the broadcast data with a timeout. It blocks until either + * the data is available or the timeout is reached, whichever comes first. + * + * @param time : how long the method should wait for the data to arrive. + * @param unit : unit of the time. + * @return a byte array if data is arrived within the timeout, or null if no data + * is available after the timeout. + * @throws Exception + */ + byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactory.java new file mode 100644 index 0000000..5dc6e83 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.io.Serializable; + + +public interface BroadcastEndpointFactory extends Serializable +{ + BroadcastEndpoint createBroadcastEndpoint() throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactoryConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactoryConfiguration.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactoryConfiguration.java new file mode 100644 index 0000000..dd55518 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastEndpointFactoryConfiguration.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.api.core; + +import java.io.Serializable; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 9/25/12 + */ +public interface BroadcastEndpointFactoryConfiguration extends Serializable +{ + BroadcastEndpointFactory createBroadcastEndpointFactory(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastGroupConfiguration.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastGroupConfiguration.java new file mode 100644 index 0000000..e974343 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/BroadcastGroupConfiguration.java @@ -0,0 +1,136 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.io.Serializable; +import java.util.List; + +import org.apache.activemq6.api.config.HornetQDefaultConfiguration; + + +/** + * The basic configuration used to determine how the server will broadcast members + * This is analogous to {@link org.apache.activemq6.api.core.DiscoveryGroupConfiguration} + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +public final class BroadcastGroupConfiguration implements Serializable +{ + private static final long serialVersionUID = 2335634694112319124L; + + private String name = null; + + private long broadcastPeriod = HornetQDefaultConfiguration.getDefaultBroadcastPeriod(); + + private BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration = null; + + private List<String> connectorInfos = null; + + public BroadcastGroupConfiguration() + { + } + + public String getName() + { + return name; + } + + public long getBroadcastPeriod() + { + return broadcastPeriod; + } + + public List<String> getConnectorInfos() + { + return connectorInfos; + } + + public BroadcastGroupConfiguration setName(final String name) + { + this.name = name; + return this; + } + + public BroadcastGroupConfiguration setBroadcastPeriod(final long broadcastPeriod) + { + this.broadcastPeriod = broadcastPeriod; + return this; + } + + public BroadcastGroupConfiguration setConnectorInfos(final List<String> connectorInfos) + { + this.connectorInfos = connectorInfos; + return this; + } + + public BroadcastEndpointFactoryConfiguration getEndpointFactoryConfiguration() + { + return endpointFactoryConfiguration; + } + + public BroadcastGroupConfiguration setEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration) + { + this.endpointFactoryConfiguration = endpointFactoryConfiguration; + return this; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + (int)(broadcastPeriod ^ (broadcastPeriod >>> 32)); + result = prime * result + ((connectorInfos == null) ? 0 : connectorInfos.hashCode()); + result = prime * result + ((endpointFactoryConfiguration == null) ? 0 : endpointFactoryConfiguration.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BroadcastGroupConfiguration other = (BroadcastGroupConfiguration)obj; + if (broadcastPeriod != other.broadcastPeriod) + return false; + if (connectorInfos == null) + { + if (other.connectorInfos != null) + return false; + } + else if (!connectorInfos.equals(other.connectorInfos)) + return false; + if (endpointFactoryConfiguration == null) + { + if (other.endpointFactoryConfiguration != null) + return false; + } + else if (!endpointFactoryConfiguration.equals(other.endpointFactoryConfiguration)) + return false; + if (name == null) + { + if (other.name != null) + return false; + } + else if (!name.equals(other.name)) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfiguration.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfiguration.java new file mode 100644 index 0000000..04641fe --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfiguration.java @@ -0,0 +1,198 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.utils.UUIDGenerator; + +/** + * This file represents how we are using Discovery. + * <p> + * The discovery configuration could either use plain UDP, or JGroups.<br> + * If using UDP, all the UDP properties will be filled and the jgroups properties will be + * {@code null}.<br> + * If using JGroups, all the UDP properties will be -1 or {@code null} and the jgroups properties + * will be filled.<br> + * If by any reason, both properties are filled, the JGroups takes precedence. That means, if + * {@code jgroupsFile != null} then the Grouping method used will be JGroups. + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author Clebert Suconic + */ +public final class DiscoveryGroupConfiguration implements Serializable +{ + private static final long serialVersionUID = 8657206421727863400L; + + private String name = UUIDGenerator.getInstance().generateStringUUID(); + + private long refreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; + + private long discoveryInitialWaitTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; + + /* + * The localBindAddress is needed so we can be backward compatible with 2.2 clients + * */ + private transient String localBindAddress = null; + + /* + * The localBindPort is needed so we can be backward compatible with 2.2 clients + * */ + private transient int localBindPort = -1; + + /* + * The groupAddress is needed so we can be backward compatible with 2.2 clients + * */ + private String groupAddress = null; + + /* + * The groupPort is needed so we can be backward compatible with 2.2 clients + * */ + private int groupPort = -1; + + /* + * This is the actual object used by the class, it has to be transient so we can handle deserialization with a 2.2 client + * */ + private transient BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration; + + public DiscoveryGroupConfiguration() + { + } + + public String getName() + { + return name; + } + + public long getRefreshTimeout() + { + return refreshTimeout; + } + + /** + * @param name the name to set + */ + public DiscoveryGroupConfiguration setName(final String name) + { + this.name = name; + return this; + } + + /** + * @param refreshTimeout the refreshTimeout to set + */ + public DiscoveryGroupConfiguration setRefreshTimeout(final long refreshTimeout) + { + this.refreshTimeout = refreshTimeout; + return this; + } + + /** + * @return the discoveryInitialWaitTimeout + */ + public long getDiscoveryInitialWaitTimeout() + { + return discoveryInitialWaitTimeout; + } + + /** + * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set + */ + public DiscoveryGroupConfiguration setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout) + { + this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout; + return this; + } + + public BroadcastEndpointFactoryConfiguration getBroadcastEndpointFactoryConfiguration() + { + return endpointFactoryConfiguration; + } + + public DiscoveryGroupConfiguration setBroadcastEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration) + { + this.endpointFactoryConfiguration = endpointFactoryConfiguration; + if (endpointFactoryConfiguration instanceof DiscoveryGroupConfigurationCompatibilityHelper) + { + DiscoveryGroupConfigurationCompatibilityHelper dgcch = (DiscoveryGroupConfigurationCompatibilityHelper) endpointFactoryConfiguration; + localBindAddress = dgcch.getLocalBindAddress(); + localBindPort = dgcch.getLocalBindPort(); + groupAddress = dgcch.getGroupAddress(); + groupPort = dgcch.getGroupPort(); + } + return this; + } + + private void writeObject(ObjectOutputStream out) throws IOException + { + out.defaultWriteObject(); + if (groupPort < 0) + { + out.writeObject(endpointFactoryConfiguration); + } + } + + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException + { + in.defaultReadObject(); + if (groupPort < 0) + { + endpointFactoryConfiguration = (BroadcastEndpointFactoryConfiguration) in.readObject(); + } + else + { + endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort) + .setLocalBindAddress(localBindAddress) + .setLocalBindPort(localBindPort); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o; + + if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false; + if (refreshTimeout != that.refreshTimeout) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32)); + result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32)); + return result; + } + + @Override + public String toString() + { + return "DiscoveryGroupConfiguration{" + + "name='" + name + '\'' + + ", refreshTimeout=" + refreshTimeout + + ", discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java new file mode 100644 index 0000000..bf1dce4 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.api.core; + +/** + * This interface is needed for making a DiscoveryGroupConfiguration backward + * compatible with version 2.2 clients. It is used to extract from new + * {@link org.apache.activemq6.api.core.BroadcastEndpointFactoryConfiguration} the four + * UDP attributes in order to form a version 2.2 DiscoveryGroupConfiguration + * in time of serialization. + * + * @see DiscoveryGroupConfiguration#readObject(java.io.ObjectInputStream) + * @see DiscoveryGroupConfiguration#writeObject(java.io.ObjectOutputStream) + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 12/13/12 + */ +public interface DiscoveryGroupConfigurationCompatibilityHelper +{ +// XXX No javadocs + String getLocalBindAddress(); + +// XXX No javadocs + int getLocalBindPort(); + +// XXX No javadocs + String getGroupAddress(); + +// XXX No javadocs + int getGroupPort(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/FilterConstants.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/FilterConstants.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/FilterConstants.java new file mode 100644 index 0000000..816d5d1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/FilterConstants.java @@ -0,0 +1,71 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +/** + * Constants representing pre-defined message attributes that can be referenced in HornetQ core + * filter expressions. + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public final class FilterConstants +{ + /** + * Name of the HornetQ UserID header. + */ + public static final SimpleString HORNETQ_USERID = new SimpleString("HQUserID"); + + /** + * Name of the HornetQ Message expiration header. + */ + public static final SimpleString HORNETQ_EXPIRATION = new SimpleString("HQExpiration"); + + /** + * Name of the HornetQ Message durable header. + */ + public static final SimpleString HORNETQ_DURABLE = new SimpleString("HQDurable"); + + /** + * Value for the Durable header when the message is non-durable. + */ + public static final SimpleString NON_DURABLE = new SimpleString("NON_DURABLE"); + + /** + * Value for the Durable header when the message is durable. + */ + public static final SimpleString DURABLE = new SimpleString("DURABLE"); + + /** + * Name of the HornetQ Message timestamp header. + */ + public static final SimpleString HORNETQ_TIMESTAMP = new SimpleString("HQTimestamp"); + + /** + * Name of the HornetQ Message priority header. + */ + public static final SimpleString HORNETQ_PRIORITY = new SimpleString("HQPriority"); + + /** + * Name of the HornetQ Message size header. + */ + public static final SimpleString HORNETQ_SIZE = new SimpleString("HQSize"); + + /** + * All HornetQ headers are prepended by this prefix. + */ + public static final SimpleString HORNETQ_PREFIX = new SimpleString("HQ"); + + private FilterConstants() + { + // Utility class + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Interceptor.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Interceptor.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Interceptor.java new file mode 100644 index 0000000..d2a53ee --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Interceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * This is class is a simple way to intercepting calls on HornetQ client and servers. + * <p> + * To add an interceptor to HornetQ server, you have to modify the server configuration file + * {@literal hornetq-configuration.xml}.<br> + * To add it to a client, use {@link org.apache.activemq6.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)} + * + * @author [email protected] + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface Interceptor +{ + /** + * Intercepts a packet which is received before it is sent to the channel + * + * @param packet the packet being received + * @param connection the connection the packet was received on + * @return {@code true} to process the next interceptor and handle the packet, + * {@code false} to abort processing of the packet + * @throws HornetQException + */ + boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/JGroupsBroadcastGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/JGroupsBroadcastGroupConfiguration.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/JGroupsBroadcastGroupConfiguration.java new file mode 100644 index 0000000..7ecbb93 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/JGroupsBroadcastGroupConfiguration.java @@ -0,0 +1,400 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.ReceiverAdapter; +import org.jgroups.conf.PlainConfigurator; + +/** + * The configuration for creating broadcasting/discovery groups using JGroups channels + * There are two ways to constructing a JGroups channel (JChannel): + * <ol> + * <li> by passing in a JGroups configuration file<br> + * The file must exists in the hornetq classpath. HornetQ creates a JChannel with the + * configuration file and use it for broadcasting and discovery. In standalone server + * mode HornetQ uses this way for constructing JChannels.</li> + * <li> by passing in a JChannel instance<br> + * This is useful when HornetQ needs to get a JChannel from a running JGroups service as in the + * case of AS7 integration.</li> + * </ol> + * <p> + * Note only one JChannel is needed in a VM. To avoid the channel being prematurely disconnected + * by any party, a wrapper class is used. + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Howard Gao</a> + * @see JChannelWrapper, JChannelManager + */ +public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper +{ + private static final long serialVersionUID = 8952238567248461285L; + + private final BroadcastEndpointFactory factory; + + public JGroupsBroadcastGroupConfiguration(final String jgroupsFile, final String channelName) + { + factory = new BroadcastEndpointFactory() + { + private static final long serialVersionUID = 1047956472941098435L; + + @Override + public BroadcastEndpoint createBroadcastEndpoint() throws Exception + { + JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint(); + endpoint.initChannel(jgroupsFile, channelName); + return endpoint; + } + }; + } + + public JGroupsBroadcastGroupConfiguration(final JChannel channel, final String channelName) + { + factory = new BroadcastEndpointFactory() + { + private static final long serialVersionUID = 5110372849181145377L; + + @Override + public BroadcastEndpoint createBroadcastEndpoint() throws Exception + { + JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint(); + endpoint.initChannel(channel, channelName); + return endpoint; + } + }; + } + + @Override + public BroadcastEndpointFactory createBroadcastEndpointFactory() + { + return factory; + } + + @Override + public String getLocalBindAddress() + { + return null; + } + + @Override + /* + * return -1 to force deserialization of object + * */ + public int getLocalBindPort() + { + return -1; + } + + @Override + public String getGroupAddress() + { + return null; + } + + @Override + public int getGroupPort() + { + return -1; + } + + /** + * This class is the implementation of HornetQ members discovery that will use JGroups. + * + * @author Howard Gao + */ + private static final class JGroupsBroadcastEndpoint implements BroadcastEndpoint + { + private boolean clientOpened; + + private boolean broadcastOpened; + + private JChannelWrapper<?> channel; + + private JGroupsReceiver receiver; + + public void broadcast(final byte[] data) throws Exception + { + if (broadcastOpened) + { + Message msg = new Message(); + + msg.setBuffer(data); + + channel.send(msg); + } + } + + public byte[] receiveBroadcast() throws Exception + { + if (clientOpened) + { + return receiver.receiveBroadcast(); + } + else + { + return null; + } + } + + public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception + { + if (clientOpened) + { + return receiver.receiveBroadcast(time, unit); + } + else + { + return null; + } + } + + public synchronized void openClient() throws Exception + { + if (clientOpened) + { + return; + } + internalOpen(); + receiver = new JGroupsReceiver(); + channel.setReceiver(receiver); + clientOpened = true; + } + + public synchronized void openBroadcaster() throws Exception + { + if (broadcastOpened) return; + internalOpen(); + broadcastOpened = true; + } + + private void initChannel(final String jgroupsConfig, final String channelName) throws Exception + { + PlainConfigurator configurator = new PlainConfigurator(jgroupsConfig); + try + { + this.channel = JChannelManager.getJChannel(channelName, configurator); + return; + } + catch (Exception e) + { + this.channel = null; + } + URL configURL = Thread.currentThread().getContextClassLoader().getResource(jgroupsConfig); + + if (configURL == null) + { + throw new RuntimeException("couldn't find JGroups configuration " + jgroupsConfig); + } + this.channel = JChannelManager.getJChannel(channelName, configURL); + } + + private void initChannel(final JChannel channel1, final String channelName) throws Exception + { + this.channel = JChannelManager.getJChannel(channelName, channel1); + } + + protected void internalOpen() throws Exception + { + channel.connect(); + } + + public synchronized void close(boolean isBroadcast) throws Exception + { + if (isBroadcast) + { + broadcastOpened = false; + } + else + { + channel.removeReceiver(receiver); + clientOpened = false; + } + channel.close(); + } + + /** + * This class is used to receive messages from a JGroups channel. + * Incoming messages are put into a queue. + */ + private static final class JGroupsReceiver extends ReceiverAdapter + { + private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>(); + + @Override + public void receive(org.jgroups.Message msg) + { + dequeue.add(msg.getBuffer()); + } + + public byte[] receiveBroadcast() throws Exception + { + return dequeue.take(); + } + + public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception + { + return dequeue.poll(time, unit); + } + } + + /** + * This class wraps a JChannel with a reference counter. The reference counter + * controls the life of the JChannel. When reference count is zero, the channel + * will be disconnected. + * + * @param <T> + */ + private static class JChannelWrapper<T> + { + int refCount = 1; + JChannel channel; + String channelName; + List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>(); + + public JChannelWrapper(String channelName, T t) throws Exception + { + this.refCount = 1; + this.channelName = channelName; + if (t instanceof URL) + { + this.channel = new JChannel((URL) t); + } + else if (t instanceof JChannel) + { + this.channel = (JChannel) t; + } + else if (t instanceof PlainConfigurator) + { + this.channel = new JChannel((PlainConfigurator)t); + } + else + { + throw new IllegalArgumentException("Unsupported type " + t); + } + } + + public synchronized void close() + { + refCount--; + if (refCount == 0) + { + JChannelManager.closeChannel(this.channelName, channel); + } + } + + public void removeReceiver(JGroupsReceiver receiver) + { + synchronized (receivers) + { + receivers.remove(receiver); + } + } + + public synchronized void connect() throws Exception + { + if (channel.isConnected()) return; + channel.setReceiver(new ReceiverAdapter() + { + + @Override + public void receive(Message msg) + { + synchronized (receivers) + { + for (JGroupsReceiver r : receivers) + { + r.receive(msg); + } + } + } + }); + channel.connect(channelName); + } + + public void setReceiver(JGroupsReceiver jGroupsReceiver) + { + synchronized (receivers) + { + receivers.add(jGroupsReceiver); + } + } + + public void send(Message msg) throws Exception + { + channel.send(msg); + } + + public JChannelWrapper<T> addRef() + { + this.refCount++; + return this; + } + + @Override + public String toString() + { + return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName; + } + } + + /** + * This class maintain a global Map of JChannels wrapped in JChannelWrapper for + * the purpose of reference counting. + * <p/> + * Wherever a JChannel is needed it should only get it by calling the getChannel() + * method of this class. The real disconnect of channels are also done here only. + */ + private static class JChannelManager + { + private static Map<String, JChannelWrapper<?>> channels; + + public static synchronized <T> JChannelWrapper<?> getJChannel(String channelName, T t) throws Exception + { + if (channels == null) + { + channels = new HashMap<String, JChannelWrapper<?>>(); + } + JChannelWrapper<?> wrapper = channels.get(channelName); + if (wrapper == null) + { + wrapper = new JChannelWrapper<T>(channelName, t); + channels.put(channelName, wrapper); + return wrapper; + } + return wrapper.addRef(); + } + + public static synchronized void closeChannel(String channelName, JChannel channel) + { + channel.setReceiver(null); + channel.disconnect(); + channel.close(); + JChannelWrapper<?> wrapper = channels.remove(channelName); + if (wrapper == null) + { + throw new IllegalStateException("Did not find channel " + channelName); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Message.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Message.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Message.java new file mode 100644 index 0000000..12b37c1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/Message.java @@ -0,0 +1,549 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.util.Map; +import java.util.Set; + +import org.apache.activemq6.utils.UUID; + + +/** + * A Message is a routable instance that has a payload. + * <p> + * The payload (the "body") is opaque to the messaging system. A Message also has a fixed set of + * headers (required by the messaging system) and properties (defined by the users) that can be used + * by the messaging system to route the message (e.g. to ensure it matches a queue filter). + * <h2>Message Properties</h2> + * <p> + * Message can contain properties specified by the users. It is possible to convert from some types + * to other types as specified by the following table: + * <pre> + * | | boolean byte short int long float double String byte[] + * |---------------------------------------------------------------- + * |boolean | X X + * |byte | X X X X X + * |short | X X X X + * |int | X X X + * |long | X X + * |float | X X X + * |double | X X + * |String | X X X X X X X X + * |byte[] | X + * |----------------------------------------------------------------- + * </pre> + * <p> + * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a + * {@code boolean}), a {@link HornetQPropertyConversionException} will be thrown. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">ClebertSuconic</a> + * @version <tt>$Revision: 3341 $</tt> $Id: Message.java 3341 2007-11-19 14:34:57Z timfox $ + */ +public interface Message +{ + SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("_HQ_ACTUAL_EXPIRY"); + + SimpleString HDR_ORIGINAL_ADDRESS = new SimpleString("_HQ_ORIG_ADDRESS"); + + SimpleString HDR_ORIGINAL_QUEUE = new SimpleString("_HQ_ORIG_QUEUE"); + + SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID"); + + SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID"); + + SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED"); + + SimpleString HDR_LARGE_BODY_SIZE = new SimpleString("_HQ_LARGE_SIZE"); + + SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY"); + + SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_HQ_DUPL_ID"); + + SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_HQ_LVQ_NAME"); + + byte DEFAULT_TYPE = 0; + + byte OBJECT_TYPE = 2; + + byte TEXT_TYPE = 3; + + byte BYTES_TYPE = 4; + + byte MAP_TYPE = 5; + + byte STREAM_TYPE = 6; + + /** + * Returns the messageID. + * <br> + * The messageID is set when the message is handled by the server. + */ + long getMessageID(); + + /** + * Returns the userID - this is an optional user specified UUID that can be set to identify the message + * and will be passed around with the message + * + * @return the user id + */ + UUID getUserID(); + + /** + * Sets the user ID + * + * @param userID + */ + Message setUserID(UUID userID); + + /** + * Returns the address this message is sent to. + */ + SimpleString getAddress(); + + /** + * Sets the address to send this message to. + * + * @param address address to send the message to + */ + Message setAddress(SimpleString address); + + /** + * Returns this message type. + * <p> + * See fields {@literal *_TYPE} for possible values. + */ + byte getType(); + + /** + * Returns whether this message is durable or not. + */ + boolean isDurable(); + + /** + * Sets whether this message is durable or not. + * + * @param durable {@code true} to flag this message as durable, {@code false} else + */ + Message setDurable(boolean durable); + + /** + * Returns the expiration time of this message. + */ + long getExpiration(); + + /** + * Returns whether this message is expired or not. + */ + boolean isExpired(); + + /** + * Sets the expiration of this message. + * + * @param expiration expiration time + */ + Message setExpiration(long expiration); + + /** + * Returns the message timestamp. + * <br> + * The timestamp corresponds to the time this message + * was handled by a HornetQ server. + */ + long getTimestamp(); + + /** + * Sets the message timestamp. + * + * @param timestamp timestamp + */ + Message setTimestamp(long timestamp); + + /** + * Returns the message priority. + * <p> + * Values range from 0 (less priority) to 9 (more priority) inclusive. + */ + byte getPriority(); + + /** + * Sets the message priority. + * <p> + * Value must be between 0 and 9 inclusive. + * + * @param priority the new message priority + */ + Message setPriority(byte priority); + + /** + * Returns the size of the <em>encoded</em> message. + */ + int getEncodeSize(); + + /** + * Returns whether this message is a <em>large message</em> or a regular message. + */ + boolean isLargeMessage(); + + /** + * Returns the message body as a HornetQBuffer + */ + HornetQBuffer getBodyBuffer(); + + /** + * Writes the input byte array to the message body HornetQBuffer + */ + Message writeBodyBufferBytes(byte[] bytes); + + /** + * Writes the input String to the message body HornetQBuffer + */ + Message writeBodyBufferString(String string); + + /** + * Returns a <em>copy</em> of the message body as a HornetQBuffer. Any modification + * of this buffer should not impact the underlying buffer. + */ + HornetQBuffer getBodyBufferCopy(); + + // Properties + // ----------------------------------------------------------------- + + /** + * Puts a boolean property in this message. + * + * @param key property name + * @param value property value + */ + Message putBooleanProperty(SimpleString key, boolean value); + + /** + * @see #putBooleanProperty(SimpleString, boolean) + */ + Message putBooleanProperty(String key, boolean value); + + /** + * Puts a byte property in this message. + * + * @param key property name + * @param value property value + */ + Message putByteProperty(SimpleString key, byte value); + + /** + * @see #putByteProperty(SimpleString, byte) + */ + Message putByteProperty(String key, byte value); + + /** + * Puts a byte[] property in this message. + * + * @param key property name + * @param value property value + */ + Message putBytesProperty(SimpleString key, byte[] value); + + /** + * @see #putBytesProperty(SimpleString, byte[]) + */ + Message putBytesProperty(String key, byte[] value); + + /** + * Puts a short property in this message. + * + * @param key property name + * @param value property value + */ + Message putShortProperty(SimpleString key, short value); + + /** + * @see #putShortProperty(SimpleString, short) + */ + Message putShortProperty(String key, short value); + + /** + * Puts a char property in this message. + * + * @param key property name + * @param value property value + */ + Message putCharProperty(SimpleString key, char value); + + /** + * @see #putCharProperty(SimpleString, char) + */ + Message putCharProperty(String key, char value); + + /** + * Puts a int property in this message. + * + * @param key property name + * @param value property value + */ + Message putIntProperty(SimpleString key, int value); + + /** + * @see #putIntProperty(SimpleString, int) + */ + Message putIntProperty(String key, int value); + + /** + * Puts a long property in this message. + * + * @param key property name + * @param value property value + */ + Message putLongProperty(SimpleString key, long value); + + /** + * @see #putLongProperty(SimpleString, long) + */ + Message putLongProperty(String key, long value); + + /** + * Puts a float property in this message. + * + * @param key property name + * @param value property value + */ + Message putFloatProperty(SimpleString key, float value); + + /** + * @see #putFloatProperty(SimpleString, float) + */ + Message putFloatProperty(String key, float value); + + /** + * Puts a double property in this message. + * + * @param key property name + * @param value property value + */ + Message putDoubleProperty(SimpleString key, double value); + + /** + * @see #putDoubleProperty(SimpleString, double) + */ + Message putDoubleProperty(String key, double value); + + /** + * Puts a SimpleString property in this message. + * + * @param key property name + * @param value property value + */ + Message putStringProperty(SimpleString key, SimpleString value); + + /** + * Puts a String property in this message. + * + * @param key property name + * @param value property value + */ + Message putStringProperty(String key, String value); + + /** + * Puts an Object property in this message. <br> + * Accepted types are: + * <ul> + * <li>Boolean</li> + * <li>Byte</li> + * <li>Short</li> + * <li>Character</li> + * <li>Integer</li> + * <li>Long</li> + * <li>Float</li> + * <li>Double</li> + * <li>String</li> + * <li>SimpleString</li> + * </ul> + * Using any other type will throw a PropertyConversionException. + * + * @param key property name + * @param value property value + * @throws HornetQPropertyConversionException if the value is not one of the accepted property + * types. + */ + Message putObjectProperty(SimpleString key, Object value) throws HornetQPropertyConversionException; + + /** + * @see #putObjectProperty(SimpleString, Object) + */ + Message putObjectProperty(String key, Object value) throws HornetQPropertyConversionException; + + /** + * Removes the property corresponding to the specified key. + * + * @param key property name + * @return the value corresponding to the specified key or @{code null} + */ + Object removeProperty(SimpleString key); + + + /** + * @see #removeProperty(SimpleString) + */ + Object removeProperty(String key); + + /** + * Returns {@code true} if this message contains a property with the given key, {@code false} else. + * + * @param key property name + */ + boolean containsProperty(SimpleString key); + + /** + * @see #containsProperty(SimpleString) + */ + boolean containsProperty(String key); + + /** + * Returns the property corresponding to the specified key as a Boolean. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a Boolean + */ + Boolean getBooleanProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getBooleanProperty(SimpleString) + */ + Boolean getBooleanProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a Byte. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a Byte + */ + Byte getByteProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getByteProperty(SimpleString) + */ + Byte getByteProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a Double. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a Double + */ + Double getDoubleProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getDoubleProperty(SimpleString) + */ + Double getDoubleProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as an Integer. + * + * @throws HornetQPropertyConversionException if the value can not be converted to an Integer + */ + Integer getIntProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getIntProperty(SimpleString) + */ + Integer getIntProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a Long. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a Long + */ + Long getLongProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getLongProperty(SimpleString) + */ + Long getLongProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key + */ + Object getObjectProperty(SimpleString key); + + /** + * @see #getBooleanProperty(SimpleString) + */ + Object getObjectProperty(String key); + + /** + * Returns the property corresponding to the specified key as a Short. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a Short + */ + Short getShortProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getShortProperty(SimpleString) + */ + Short getShortProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a Float. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a Float + */ + Float getFloatProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getFloatProperty(SimpleString) + */ + Float getFloatProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a String. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a String + */ + String getStringProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getStringProperty(SimpleString) + */ + String getStringProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a SimpleString. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a SimpleString + */ + SimpleString getSimpleStringProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getSimpleStringProperty(SimpleString) + */ + SimpleString getSimpleStringProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns the property corresponding to the specified key as a byte[]. + * + * @throws HornetQPropertyConversionException if the value can not be converted to a byte[] + */ + byte[] getBytesProperty(SimpleString key) throws HornetQPropertyConversionException; + + /** + * @see #getBytesProperty(SimpleString) + */ + byte[] getBytesProperty(String key) throws HornetQPropertyConversionException; + + /** + * Returns all the names of the properties for this message. + */ + Set<SimpleString> getPropertyNames(); + + /** + * @return Returns the message in Map form, useful when encoding to JSON + */ + Map<String, Object> toMap(); +}
