Author: marrs
Date: Fri Oct 17 11:52:59 2014
New Revision: 1632545
URL: http://svn.apache.org/r1632545
Log:
ACE-486 Added a put() method to create a new event. Also extended tests. Did
not implement this for the MongoDB backed store.
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java?rev=1632545&r1=1632544&r2=1632545&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
Fri Oct 17 11:52:59 2014
@@ -19,6 +19,7 @@
package org.apache.ace.log.server.store;
import java.io.IOException;
+import java.util.Dictionary;
import java.util.List;
import org.apache.ace.feedback.Descriptor;
@@ -101,4 +102,15 @@ public interface LogStore
* @throws IOException in case of any error
*/
public void clean() throws IOException;
+
+ /**
+ * Create a new event out of the given type and properties. Write it to
the store and return it.
+ *
+ * @param targetID the targetID of this event.
+ * @param type the type of the event.
+ * @param props the properties of the event.
+ * @return the created event that has been persisted.
+ * @throws java.io.IOException in case of any IO error.
+ */
+ public Event put(String targetID, int type, Dictionary props) throws
IOException;
}
\ No newline at end of file
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java?rev=1632545&r1=1632544&r2=1632545&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
Fri Oct 17 11:52:59 2014
@@ -28,6 +28,7 @@ import java.io.UnsupportedEncodingExcept
import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
@@ -36,7 +37,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.LockSupport;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
@@ -144,12 +144,21 @@ public class LogStoreImpl implements Log
}
public Descriptor getDescriptor(String targetID, long logID) throws
IOException {
+ return getDescriptorInternal(targetID, logID, true);
+ }
+
+ private Descriptor getDescriptorInternal(String targetID, long logID)
throws IOException {
+ return getDescriptorInternal(targetID, logID, false);
+ }
+
+ private Descriptor getDescriptorInternal(String targetID, long logID,
boolean lock) throws IOException {
Long high = m_fileToID.get(new File(new File(m_dir,
targetIDToFilename(targetID)), String.valueOf(logID)).getAbsolutePath());
if (high != null) {
Range r = new Range(1, high);
return new Descriptor(targetID, logID, new
SortedRangeSet(r.toRepresentation()));
}
- List<Event> events = get(new Descriptor(targetID, logID,
SortedRangeSet.FULL_SET));
+ Descriptor descriptor = new Descriptor(targetID, logID,
SortedRangeSet.FULL_SET);
+ List<Event> events = lock ? get(descriptor) :
getInternal(descriptor);
long[] idsArray = new long[events.size()];
int i = 0;
@@ -480,4 +489,43 @@ public class LogStoreImpl implements Log
lockedLogs.remove(logID);
}
}
+
+ @Override
+ public Event put(String targetID, int type, Dictionary dict) throws
IOException {
+ Map<String, String> props = new HashMap<String, String>();
+ Enumeration keys = dict.keys();
+ while (keys.hasMoreElements()) {
+ String key = (String) keys.nextElement();
+ props.put(key, (String) dict.get(key));
+ }
+ List<Descriptor> descriptors = getDescriptors(targetID);
+ // sort and pick highest
+ Descriptor descriptor = null;
+ long highest = 0;
+ for (Descriptor d : descriptors) {
+ if (d.getStoreID() > highest) {
+ highest = d.getStoreID();
+ descriptor = d;
+ }
+ }
+ // check if we found a descriptor, if not we need to create one
+ if (descriptor == null) {
+ descriptor = new Descriptor(targetID,
System.currentTimeMillis(), new SortedRangeSet(""));
+ }
+ long storeID = descriptor.getStoreID();
+ obtainLock(targetID, storeID);
+ try {
+ // re-fetch within the lock
+ descriptor = getDescriptorInternal(targetID, storeID);
+ long high = descriptor.getRangeSet().getHigh();
+ Event result = new Event(targetID, storeID, high + 1,
System.currentTimeMillis(), type, props);
+ List<Event> list = new ArrayList<>();
+ list.add(result);
+ put(targetID, storeID, list);
+ return result;
+ }
+ finally {
+ releaseLock(targetID, storeID);
+ }
+ }
}
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java?rev=1632545&r1=1632544&r2=1632545&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
Fri Oct 17 11:52:59 2014
@@ -185,6 +185,12 @@ public class MongoLogStore implements Lo
@Override
public void clean() throws IOException {
- // TODO : if m_max_events > 0 then remove all events from the mongo
store where there are more than m_maxEvents
+ // TODO if m_max_events > 0 then remove all events from the mongo
store where there are more than m_maxEvents
+ }
+
+ @Override
+ public Event put(String targetID, int type, Dictionary props) throws
IOException {
+ // TODO add an event to the appropriate store
+ return null;
}
}
Modified:
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java?rev=1632545&r1=1632544&r2=1632545&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
(original)
+++
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
Fri Oct 17 11:52:59 2014
@@ -23,6 +23,7 @@ import static org.apache.ace.test.utils.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.Iterator;
import java.util.List;
@@ -147,6 +148,10 @@ public class LogServletTest {
public void clean() throws IOException {
throw new UnsupportedOperationException("not implemented");
}
+ @Override
+ public Event put(String targetID, int type, Dictionary props) throws
IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
}
private class MockServletOutputStream extends ServletOutputStream {
Modified:
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java?rev=1632545&r1=1632544&r2=1632545&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
(original)
+++
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Fri Oct 17 11:52:59 2014
@@ -100,6 +100,39 @@ public class ServerLogStoreTester {
assert in.equals(out) : "Stored events differ from the added.";
}
+ @Test(groups = { UNIT })
+ public void testCreateLogMessagesConcurrently() throws Exception {
+ final Properties props = new Properties();
+ props.put("test", "bar");
+
+ List<Descriptor> ranges = m_logStore.getDescriptors();
+ assert ranges.isEmpty() : "New store should have no ranges.";
+ ExecutorService exec = Executors.newFixedThreadPool(10);
+ for (final String target : new String[] { "g1", "g2", "g3", "g4",
"g5", "g6", "g7", "g8", "g9", "g10" }) {
+ exec.execute(new Runnable() { public void run() {
+ for (long id = 0; id < 1000; id++) {
+ try {
+ m_logStore.put(target, 1,
props);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };} );
+ }
+ exec.shutdown();
+ exec.awaitTermination(10, TimeUnit.SECONDS);
+ assert m_logStore.getDescriptors().size() == 10 : "Incorrect amount of
ranges returned from store: " + m_logStore.getDescriptors().size();
+ List<Event> stored = new ArrayList<Event>();
+ for (Descriptor range : m_logStore.getDescriptors()) {
+ for (Descriptor range2 :
m_logStore.getDescriptors(range.getTargetID())) {
+
stored.addAll(m_logStore.get(m_logStore.getDescriptor(range2.getTargetID(),
range2.getStoreID())));
+ }
+ }
+ assert stored.size() == 10000 : "Incorrect number of events got
stored: " + stored.size();
+ }
+
+
@Test(groups = { TestUtils.UNIT })
public void testLogWithSpecialCharacters() throws IOException {
String targetID = "myta\0rget";