Author: adc
Date: Sat Sep 10 20:47:30 2011
New Revision: 1167604
URL: http://svn.apache.org/viewvc?rev=1167604&view=rev
Log:
Sketched out group protocol
Added:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoEventQueue.java
- copied, changed from r1164949,
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoProperty.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/DownEvent.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Event.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/GroupMembershipFilter.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/UpEvent.java
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/EventMatcherUtil.java
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/FirstMemberGroupMembershipFilterTest.java
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/MessageMatcherUtil.java
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/SecondMemberGroupMembershipFilterTest.java
Modified:
mina/sandbox/adc/ahc/mina3/pom.xml
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/AcceptMessage.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/CommitMessage.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/LocalDetectorListener.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Member.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Message.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/ProposeMessage.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/RetryMessage.java
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/link/LinkStateFilter.java
mina/sandbox/adc/ahc/pom.xml
mina/sandbox/adc/ahc/test/src/main/java/org/apache/ahc/test/Utils.java
Modified: mina/sandbox/adc/ahc/mina3/pom.xml
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/pom.xml?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
--- mina/sandbox/adc/ahc/mina3/pom.xml (original)
+++ mina/sandbox/adc/ahc/mina3/pom.xml Sat Sep 10 20:47:30 2011
@@ -43,6 +43,11 @@
</dependency>
<dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ahc</groupId>
<artifactId>api</artifactId>
</dependency>
Copied:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoEventQueue.java
(from r1164949,
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoProperty.java)
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoEventQueue.java?p2=mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoEventQueue.java&p1=mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoProperty.java&r1=1164949&r2=1167604&rev=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoProperty.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/api/IoEventQueue.java
Sat Sep 10 20:47:30 2011
@@ -22,16 +22,12 @@ package org.apache.mina.core.api;
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
[email protected]({java.lang.annotation.ElementType.METHOD,
java.lang.annotation.ElementType.FIELD})
[email protected](java.lang.annotation.ElementType.FIELD)
@java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
@java.lang.annotation.Documented
-public @interface IoProperty
+public @interface IoEventQueue
{
public String name() default "<property name>";
- public Class type() default IoProperty.class;
-
- public IoPropertyScope scope() default IoPropertyScope.PRIVATE;
-
- public boolean required() default true;
+ public Class type() default IoEventQueue.class;
}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/AcceptMessage.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/AcceptMessage.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/AcceptMessage.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/AcceptMessage.java
Sat Sep 10 20:47:30 2011
@@ -18,9 +18,33 @@
*/
package org.apache.mina.group;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class AcceptMessage extends Message
+public class AcceptMessage<ID extends Comparable<ID>> extends Message<ID>
{
+ private final long index;
+
+ public AcceptMessage(ID sender, long index)
+ {
+ super(sender);
+ this.index = index;
+ }
+
+ public long getIndex()
+ {
+ return index;
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this).
+ append("sender", getSender()).
+ append("index", index).
+ toString();
+ }
}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/CommitMessage.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/CommitMessage.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/CommitMessage.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/CommitMessage.java
Sat Sep 10 20:47:30 2011
@@ -18,9 +18,45 @@
*/
package org.apache.mina.group;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class CommitMessage extends Message
+public class CommitMessage<ID extends Comparable<ID>> extends Message<ID>
{
-}
+ private final Set<ID> view;
+ private final int index;
+
+ public CommitMessage(ID sender, Set<ID> view, int index)
+ {
+ super(sender);
+ this.view = Collections.unmodifiableSet(new HashSet<ID>(view));
+ this.index = index;
+ }
+
+ public Set<ID> getView()
+ {
+ return view;
+ }
+
+ public int getIndex()
+ {
+ return index;
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this).
+ append("sender", getSender()).
+ append("view", view).
+ append("index", index).
+ toString();
+ }
+}
\ No newline at end of file
Added:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/DownEvent.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/DownEvent.java?rev=1167604&view=auto
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/DownEvent.java
(added)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/DownEvent.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class DownEvent<ID extends Comparable<ID>> extends Event<ID>
+{
+ public DownEvent(ID id)
+ {
+ super(id);
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this).
+ append("id", getId()).
+ toString();
+ }
+}
Added: mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Event.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Event.java?rev=1167604&view=auto
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Event.java
(added)
+++ mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Event.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public abstract class Event<ID extends Comparable<ID>>
+{
+ private final ID id;
+
+ public Event(ID id)
+ {
+ this.id = id;
+ }
+
+ public ID getId()
+ {
+ return id;
+ }
+}
Added:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/GroupMembershipFilter.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/GroupMembershipFilter.java?rev=1167604&view=auto
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/GroupMembershipFilter.java
(added)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/GroupMembershipFilter.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.mina.core.AbstractIoFilter;
+import org.apache.mina.core.api.IoDown;
+import org.apache.mina.core.api.IoEvent;
+import org.apache.mina.core.api.IoEventQueue;
+import org.apache.mina.core.api.IoProperty;
+import org.apache.mina.core.api.IoUp;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class GroupMembershipFilter<ID extends Comparable<ID>> extends
AbstractIoFilter implements IoUp<Message<ID>>
+{
+ static final Logger LOG =
LoggerFactory.getLogger(GroupMembershipFilter.class);
+ @IoProperty private ID myId;
+ @IoProperty private Set<LocalDetectorListener<ID>> listeners = new
CopyOnWriteArraySet<LocalDetectorListener<ID>>();
+ @IoProperty private List<Set<ID>> history = new LinkedList<Set<ID>>();
+ @IoProperty private Set<ID> vp = new HashSet<ID>();
+ @IoProperty private Map<ID, IoDown<Message>> children;
+ @IoProperty private Map<ID, Set<ID>> v = new HashMap<ID, Set<ID>>();
+ @IoProperty private Map<ID, Integer> propIn = new HashMap<ID, Integer>();
+ @IoProperty private Map<ID, Boolean> prop = new HashMap<ID, Boolean>();
+ @IoProperty private Map<ID, Boolean> ack = new HashMap<ID, Boolean>();
+ @IoProperty private int propOut;
+ @IoProperty private int next;
+ @IoEventQueue private Queue<Event<ID>> events;
+
+ public GroupMembershipFilter(ID myId)
+ {
+ this.myId = myId;
+ }
+
+ public Map<ID, IoDown<Message>> getChildren()
+ {
+ return children;
+ }
+
+ public void setChildren(Map<ID, IoDown<Message>> children)
+ {
+ this.children = children;
+ }
+
+ public Queue<Event<ID>> getEvents()
+ {
+ return events;
+ }
+
+ public Set<LocalDetectorListener<ID>> getListeners()
+ {
+ return listeners;
+ }
+
+ @Override
+ public void init() throws Exception
+ {
+ propOut = 0;
+ next = 0;
+ }
+
+ @Override
+ public void receive(Message<ID> message) throws Exception
+ {
+ LOG.debug("received {}", message);
+
+ ID sender = message.getSender();
+ if (message instanceof ProposeMessage)
+ {
+ @SuppressWarnings({"unchecked"}) ProposeMessage<ID> proposeMessage
= (ProposeMessage<ID>)message;
+
+ v.put(sender, proposeMessage.getView());
+ propIn.put(sender, proposeMessage.getIndex());
+ prop.put(sender, true);
+ }
+ else if (message instanceof RetryMessage)
+ {
+ @SuppressWarnings({"unchecked"}) RetryMessage<ID> retryMessage =
(RetryMessage<ID>)message;
+
+ if (propOut == retryMessage.getOldIndex())
+ {
+ ID min = Collections.min(vp);
+ if (myId.equals(min))
+ {
+ propOut = retryMessage.getNewIndex();
+ for (ID id : vp)
+ {
+ children.get(id).send(new ProposeMessage<ID>(myId, vp,
propOut));
+ }
+ ack.clear();
+ }
+ }
+ }
+ else if (message instanceof AcceptMessage)
+ {
+ @SuppressWarnings({"unchecked"}) AcceptMessage<ID> acceptMessage =
(AcceptMessage<ID>)message;
+ if (propOut == acceptMessage.getIndex()) ack.put(sender, true);
+ boolean full = true;
+ for (ID id : vp)
+ {
+ full = full && Boolean.TRUE.equals(ack.get(id));
+ if (!full) break;
+ }
+ if (full)
+ {
+ for (ID id : vp)
+ {
+ children.get(id).send(new CommitMessage<ID>(myId, vp,
propOut));
+ }
+ ack.clear();
+ }
+ }
+ else if (message instanceof CommitMessage)
+ {
+ @SuppressWarnings({"unchecked"}) CommitMessage<ID> commitMessage =
(CommitMessage<ID>)message;
+ history.add(commitMessage.getView());
+ for (LocalDetectorListener<ID> listener : listeners)
+ {
+ try
+ {
+ listener.changed(commitMessage.getView());
+ }
+ catch (Throwable t)
+ {
+ LOG.warn("Listener " + listener + " threw exception", t);
+ }
+ }
+ }
+
+ if (Boolean.TRUE.equals(prop.get(sender)) && vp.equals(v.get(sender)))
+ {
+ prop.put(sender, false);
+ if (propIn.get(sender) < next)
+ {
+ children.get(sender).send(new RetryMessage<ID>(myId,
propIn.get(sender), next));
+ }
+ else
+ {
+ next = propIn.get(sender) + 1;
+ children.get(sender).send(new AcceptMessage<ID>(myId,
propIn.get(sender)));
+ }
+ }
+ }
+
+ public void up(ID id)
+ {
+ events.add(new UpEvent<ID>(id));
+ }
+
+ public void down(ID id)
+ {
+ events.add(new DownEvent<ID>(id));
+ }
+
+ @IoEvent(name = "up", type = UpEvent.class)
+ protected void handle(UpEvent<ID> event) throws Exception
+ {
+ LOG.debug("Received up event {}", event);
+
+ vp.add(event.getId());
+ groupChange();
+ }
+
+ @IoEvent(name = "down", type = DownEvent.class)
+ protected void handle(DownEvent<ID> event) throws Exception
+ {
+ LOG.debug("Received down event {}", event);
+
+ vp.remove(event.getId());
+ groupChange();
+ }
+
+ protected void groupChange() throws Exception
+ {
+ ID min = Collections.min(vp);
+ if (myId.equals(min))
+ {
+ propOut++;
+ ack.clear();
+ for (ID id : vp)
+ {
+ children.get(id).send(new ProposeMessage<ID>(myId, vp,
propOut));
+ }
+ }
+ }
+}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/LocalDetectorListener.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/LocalDetectorListener.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/LocalDetectorListener.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/LocalDetectorListener.java
Sat Sep 10 20:47:30 2011
@@ -18,13 +18,13 @@
*/
package org.apache.mina.group;
-import java.util.List;
+import java.util.Set;
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public interface LocalDetectorListener<T>
+public interface LocalDetectorListener<ID extends Comparable<ID>>
{
- public void changed(List<Member<T>> membership);
+ public void changed(Set<ID> membership);
}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Message.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Message.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Message.java
(original)
+++ mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/Message.java
Sat Sep 10 20:47:30 2011
@@ -21,6 +21,18 @@ package org.apache.mina.group;
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public abstract class Message
+public abstract class Message<ID extends Comparable<ID>>
{
+ private final ID sender;
+
+ protected Message(ID sender)
+ {
+ assert sender != null;
+ this.sender = sender;
+ }
+
+ public ID getSender()
+ {
+ return sender;
+ }
}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/ProposeMessage.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/ProposeMessage.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/ProposeMessage.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/ProposeMessage.java
Sat Sep 10 20:47:30 2011
@@ -18,9 +18,45 @@
*/
package org.apache.mina.group;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class ProposeMessage extends Message
+public class ProposeMessage<ID extends Comparable<ID>> extends Message<ID>
{
+ private final Set<ID> view;
+ private final int index;
+
+ public ProposeMessage(ID sender, Set<ID> view, int index)
+ {
+ super(sender);
+ this.view = Collections.unmodifiableSet(new HashSet<ID>(view));
+ this.index = index;
+ }
+
+ public Set<ID> getView()
+ {
+ return view;
+ }
+
+ public int getIndex()
+ {
+ return index;
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this).
+ append("sender", getSender()).
+ append("view", view).
+ append("index", index).
+ toString();
+ }
}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/RetryMessage.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/RetryMessage.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/RetryMessage.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/RetryMessage.java
Sat Sep 10 20:47:30 2011
@@ -18,9 +18,41 @@
*/
package org.apache.mina.group;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class RetryMessage extends Message
+public class RetryMessage<ID extends Comparable<ID>> extends Message<ID>
{
+ private int oldIndex;
+ private int newIndex;
+
+ public RetryMessage(ID sender, int oldIndex, int newIndex)
+ {
+ super(sender);
+ this.oldIndex = oldIndex;
+ this.newIndex = newIndex;
+ }
+
+ public int getOldIndex()
+ {
+ return oldIndex;
+ }
+
+ public int getNewIndex()
+ {
+ return newIndex;
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this).
+ append("sender", getSender()).
+ append("oldIndex", oldIndex).
+ append("newIndex", newIndex).
+ toString();
+ }
}
Added:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/UpEvent.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/UpEvent.java?rev=1167604&view=auto
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/UpEvent.java
(added)
+++ mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/group/UpEvent.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class UpEvent<ID extends Comparable<ID>> extends Event<ID>
+{
+ public UpEvent(ID id)
+ {
+ super(id);
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this).
+ append("id", getId()).
+ toString();
+ }
+}
Modified:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/link/LinkStateFilter.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/link/LinkStateFilter.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/link/LinkStateFilter.java
(original)
+++
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/link/LinkStateFilter.java
Sat Sep 10 20:47:30 2011
@@ -18,6 +18,7 @@
*/
package org.apache.mina.link;
+import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
@@ -44,7 +45,7 @@ public class LinkStateFilter extends Sta
static final Logger LOG = LoggerFactory.getLogger(LinkStateFilter.class);
@IoProperty protected IoDown<Token> child;
@IoProperty protected int numTokens;
- @IoProperty(scope = IoPropertyScope.SESSION) protected Set<LinkListener>
listeners;
+ @IoProperty(scope = IoPropertyScope.SESSION) protected Set<LinkListener>
listeners = new HashSet<LinkListener>();
public LinkStateFilter()
{
@@ -62,6 +63,11 @@ public class LinkStateFilter extends Sta
this.child = child;
}
+ public Set<LinkListener> getListeners()
+ {
+ return listeners;
+ }
+
/**
* {@inheritDoc}
*/
Added:
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/EventMatcherUtil.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/EventMatcherUtil.java?rev=1167604&view=auto
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/EventMatcherUtil.java
(added)
+++
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/EventMatcherUtil.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class EventMatcherUtil
+{
+ public static <ID extends Comparable<ID>> Matcher<DownEvent<ID>>
matchesDownEvent(final ID member)
+ {
+ return new BaseMatcher<DownEvent<ID>>()
+ {
+ @Override
+ @SuppressWarnings({"unchecked"})
+ public boolean matches(Object o)
+ {
+ if (!(o instanceof DownEvent)) return false;
+ DownEvent<ID> event = (DownEvent<ID>)o;
+ return event.getId().equals(member);
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ }
+ };
+ }
+
+ public static <ID extends Comparable<ID>> Matcher<UpEvent<ID>>
matchesUpEvent(final ID member)
+ {
+ return new BaseMatcher<UpEvent<ID>>()
+ {
+ @Override
+ @SuppressWarnings({"unchecked"})
+ public boolean matches(Object o)
+ {
+ if (!(o instanceof UpEvent)) return false;
+ UpEvent<ID> event = (UpEvent<ID>)o;
+ return event.getId().equals(member);
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ }
+ };
+ }
+
+ private EventMatcherUtil() {}
+}
Added:
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/FirstMemberGroupMembershipFilterTest.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/FirstMemberGroupMembershipFilterTest.java?rev=1167604&view=auto
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/FirstMemberGroupMembershipFilterTest.java
(added)
+++
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/FirstMemberGroupMembershipFilterTest.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.ahc.test.Utils.asSet;
+import static org.apache.ahc.test.Utils.inject;
+import static org.apache.mina.group.EventMatcherUtil.matchesDownEvent;
+import static org.apache.mina.group.EventMatcherUtil.matchesUpEvent;
+import static org.apache.mina.group.MessageMatcherUtil.matchesAcceptMessage;
+import static org.apache.mina.group.MessageMatcherUtil.matchesCommitMessage;
+import static org.apache.mina.group.MessageMatcherUtil.matchesProposeMessage;
+import static org.apache.mina.group.MessageMatcherUtil.matchesRetryMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Matchers;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.apache.mina.core.api.IoDown;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class FirstMemberGroupMembershipFilterTest
+{
+ private Queue<Event<Integer>> events;
+ private LocalDetectorListener<Integer> listener;
+ private IoDown<Message> one;
+ private IoDown<Message> two;
+ private IoDown<Message> three;
+ private IoDown<Message> four;
+ private IoDown<Message> five;
+ private Set<Integer> vp = new HashSet<Integer>();
+ private GroupMembershipFilter<Integer> filter;
+
+ @Test
+ public void initialDownEvent() throws Exception
+ {
+ // filter gets notified that link to member 3 is down
+ filter.down(3);
+
+ // make sure event is queued to filter, nothing else should happen
+ // since the framework should call the event handler
+ verify(events).add(argThat(matchesDownEvent(3)));
+ verify(one, never()).send(Matchers.<Message>any());
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void initialUpEvent() throws Exception
+ {
+ // filter gets notified that link to member 3 is up
+ filter.up(3);
+
+ // make sure event is queued to filter, nothing else should happen
+ // since the framework should call the event handler
+ verify(events).add(argThat(matchesUpEvent(3)));
+ verify(one, never()).send(Matchers.<Message>any());
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void initialDownEventDelivery() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 3, 4, 5));
+
+ // simulate framework delivering the down event
+ filter.handle(new DownEvent<Integer>(3));
+
+ verify(one).send(argThat(matchesProposeMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(two).send(argThat(matchesProposeMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four).send(argThat(matchesProposeMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(five).send(argThat(matchesProposeMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void initialUpEventDelivery() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+
+ // simulate framework delivering the down event
+ filter.handle(new UpEvent<Integer>(3));
+
+ verify(one).send(argThat(matchesProposeMessage(1, asSet(1, 2, 3, 4,
5), 1)));
+ verify(two).send(argThat(matchesProposeMessage(1, asSet(1, 2, 3, 4,
5), 1)));
+ verify(three).send(argThat(matchesProposeMessage(1, asSet(1, 2, 3, 4,
5), 1)));
+ verify(four).send(argThat(matchesProposeMessage(1, asSet(1, 2, 3, 4,
5), 1)));
+ verify(five).send(argThat(matchesProposeMessage(1, asSet(1, 2, 3, 4,
5), 1)));
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void initialProposeMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+
+ // simulate framework delivering propose message
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void generateCommitMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+ inject(filter, "propOut", 1);
+
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+
+ filter.receive(new AcceptMessage<Integer>(1, 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+
+ filter.receive(new AcceptMessage<Integer>(2, 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+
+ filter.receive(new AcceptMessage<Integer>(4, 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+
+ filter.receive(new AcceptMessage<Integer>(5, 1));
+
+ InOrder order = inOrder(one);
+ order.verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ order.verify(one).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4,
5), 1)));
+ verify(two).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(five).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void receiveCommitMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+ inject(filter, "propOut", 1);
+
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+ filter.receive(new AcceptMessage<Integer>(1, 1));
+ filter.receive(new AcceptMessage<Integer>(2, 1));
+ filter.receive(new AcceptMessage<Integer>(4, 1));
+ filter.receive(new AcceptMessage<Integer>(5, 1));
+ filter.receive(new CommitMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ InOrder order = inOrder(one);
+ order.verify(one).send(argThat(matchesAcceptMessage(1, 1)));
+ order.verify(one).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4,
5), 1)));
+ verify(two).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(five).send(argThat(matchesCommitMessage(1, asSet(1, 2, 4, 5),
1)));
+ verify(listener).changed(asSet(1, 2, 4, 5));
+ }
+
+ @Test
+ public void coordinatorBehindProposeMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+ inject(filter, "next", 15);
+
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ verify(one).send(argThat(matchesRetryMessage(1, 1, 15)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Before
+ @SuppressWarnings({"unchecked"})
+ public void setUp() throws Exception
+ {
+ events = mock(Queue.class);
+ listener = mock(LocalDetectorListener.class);
+ one = mock(IoDown.class);
+ two = mock(IoDown.class);
+ three = mock(IoDown.class);
+ four = mock(IoDown.class);
+ five = mock(IoDown.class);
+
+ Map<Integer, IoDown<Message>> children = new HashMap<Integer,
IoDown<Message>>();
+ children.put(1, one);
+ children.put(2, two);
+ children.put(3, three);
+ children.put(4, four);
+ children.put(5, five);
+
+ filter = new GroupMembershipFilter<Integer>(1);
+ vp.clear();
+
+ filter.setChildren(children);
+ inject(filter, "events", events);
+ inject(filter, "vp", vp);
+ filter.init();
+ filter.getListeners().add(listener);
+ }
+}
Added:
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/MessageMatcherUtil.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/MessageMatcherUtil.java?rev=1167604&view=auto
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/MessageMatcherUtil.java
(added)
+++
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/MessageMatcherUtil.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import java.util.Set;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class MessageMatcherUtil
+{
+ public static <ID extends Comparable<ID>> BaseMatcher<ProposeMessage<ID>>
matchesProposeMessage(final ID sender, final Set<ID> view, final int index)
+ {
+ return new BaseMatcher<ProposeMessage<ID>>()
+ {
+ @Override
+ public boolean matches(Object o)
+ {
+ if (!(o instanceof ProposeMessage)) return false;
+
+ ProposeMessage message = (ProposeMessage)o;
+
+ return sender.equals(message.getSender()) &&
view.equals(message.getView()) && index == message.getIndex();
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ }
+ };
+ }
+
+ public static <ID extends Comparable<ID>> BaseMatcher<AcceptMessage<ID>>
matchesAcceptMessage(final ID sender, final int index)
+ {
+ return new BaseMatcher<AcceptMessage<ID>>()
+ {
+ @Override
+ public boolean matches(Object o)
+ {
+ if (!(o instanceof AcceptMessage)) return false;
+
+ AcceptMessage message = (AcceptMessage)o;
+
+ return sender.equals(message.getSender()) && index ==
message.getIndex();
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ }
+ };
+ }
+
+ public static <ID extends Comparable<ID>> BaseMatcher<CommitMessage<ID>>
matchesCommitMessage(final ID sender, final Set<ID> view, final int index)
+ {
+ return new BaseMatcher<CommitMessage<ID>>()
+ {
+ @Override
+ public boolean matches(Object o)
+ {
+ if (!(o instanceof CommitMessage)) return false;
+
+ CommitMessage message = (CommitMessage)o;
+
+ return sender.equals(message.getSender()) &&
view.equals(message.getView()) && index == message.getIndex();
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ }
+ };
+ }
+
+ public static <ID extends Comparable<ID>> BaseMatcher<RetryMessage<ID>>
matchesRetryMessage(final ID sender, final int oldIndex, final int newIndex)
+ {
+ return new BaseMatcher<RetryMessage<ID>>()
+ {
+ @Override
+ public boolean matches(Object o)
+ {
+ if (!(o instanceof RetryMessage)) return false;
+
+ RetryMessage message = (RetryMessage)o;
+
+ return sender.equals(message.getSender()) && oldIndex ==
message.getOldIndex() && newIndex == message.getNewIndex();
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ }
+ };
+ }
+ private MessageMatcherUtil() {}
+}
Added:
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/SecondMemberGroupMembershipFilterTest.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/SecondMemberGroupMembershipFilterTest.java?rev=1167604&view=auto
==============================================================================
---
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/SecondMemberGroupMembershipFilterTest.java
(added)
+++
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/group/SecondMemberGroupMembershipFilterTest.java
Sat Sep 10 20:47:30 2011
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.group;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.ahc.test.Utils.asSet;
+import static org.apache.ahc.test.Utils.inject;
+import static org.apache.mina.group.EventMatcherUtil.matchesDownEvent;
+import static org.apache.mina.group.MessageMatcherUtil.matchesAcceptMessage;
+import static org.apache.mina.group.MessageMatcherUtil.matchesCommitMessage;
+import static org.apache.mina.group.MessageMatcherUtil.matchesProposeMessage;
+import static org.apache.mina.group.MessageMatcherUtil.matchesRetryMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Matchers;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.apache.mina.core.api.IoDown;
+
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class SecondMemberGroupMembershipFilterTest
+{
+ private LocalDetectorListener<Integer> listener;
+ private IoDown<Message> one;
+ private IoDown<Message> two;
+ private IoDown<Message> three;
+ private IoDown<Message> four;
+ private IoDown<Message> five;
+ private Set<Integer> vp = new HashSet<Integer>();
+ private GroupMembershipFilter<Integer> filter;
+
+ @Test
+ public void initialDownEventDelivery() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 3, 4, 5));
+
+ // simulate framework delivering the down event
+ filter.handle(new DownEvent<Integer>(3));
+
+ verify(one, never()).send(Matchers.<Message>any());
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void receiveProposeMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+
+ // simulate framework delivering propose message
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(2, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Test
+ public void receiveCommitMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+ filter.receive(new CommitMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ verify(one).send(argThat(matchesAcceptMessage(2, 1)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener).changed(asSet(1, 2, 4, 5));
+ }
+
+ @Test
+ public void coordinatorBehindProposeMessage() throws Exception
+ {
+ vp.addAll(asSet(1, 2, 4, 5));
+ inject(filter, "next", 15);
+
+ filter.receive(new ProposeMessage<Integer>(1, asSet(1, 2, 4, 5), 1));
+
+ verify(one).send(argThat(matchesRetryMessage(2, 1, 15)));
+ verify(two, never()).send(Matchers.<Message>any());
+ verify(three, never()).send(Matchers.<Message>any());
+ verify(four, never()).send(Matchers.<Message>any());
+ verify(five, never()).send(Matchers.<Message>any());
+ verify(listener, never()).changed(Matchers.<Set<Integer>>any());
+ }
+
+ @Before
+ @SuppressWarnings({"unchecked"})
+ public void setUp() throws Exception
+ {
+ Queue<Event<Integer>> events = mock(Queue.class);
+ listener = mock(LocalDetectorListener.class);
+ one = mock(IoDown.class);
+ two = mock(IoDown.class);
+ three = mock(IoDown.class);
+ four = mock(IoDown.class);
+ five = mock(IoDown.class);
+
+ Map<Integer, IoDown<Message>> children = new HashMap<Integer,
IoDown<Message>>();
+ children.put(1, one);
+ children.put(2, two);
+ children.put(3, three);
+ children.put(4, four);
+ children.put(5, five);
+
+ filter = new GroupMembershipFilter<Integer>(2);
+ vp.clear();
+
+ filter.setChildren(children);
+ inject(filter, "events", events);
+ inject(filter, "vp", vp);
+ filter.init();
+ filter.getListeners().add(listener);
+ }
+}
Modified: mina/sandbox/adc/ahc/pom.xml
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/pom.xml?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
--- mina/sandbox/adc/ahc/pom.xml (original)
+++ mina/sandbox/adc/ahc/pom.xml Sat Sep 10 20:47:30 2011
@@ -94,6 +94,12 @@
</dependency>
<dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>20030203.000129</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ahc</groupId>
<artifactId>api</artifactId>
<version>${project.version}</version>
Modified: mina/sandbox/adc/ahc/test/src/main/java/org/apache/ahc/test/Utils.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/test/src/main/java/org/apache/ahc/test/Utils.java?rev=1167604&r1=1167603&r2=1167604&view=diff
==============================================================================
--- mina/sandbox/adc/ahc/test/src/main/java/org/apache/ahc/test/Utils.java
(original)
+++ mina/sandbox/adc/ahc/test/src/main/java/org/apache/ahc/test/Utils.java Sat
Sep 10 20:47:30 2011
@@ -19,6 +19,9 @@
package org.apache.ahc.test;
import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
/**
@@ -90,5 +93,10 @@ public class Utils
}
}
+ public static <T> Set<T> asSet(T... args)
+ {
+ return new HashSet<T>(Arrays.asList(args));
+ }
+
private Utils() { }
}