Author: marrs
Date: Wed Oct 29 13:53:39 2014
New Revision: 1635133
URL: http://svn.apache.org/r1635133
Log:
ACE-490 ACE-491 Initial implementation and tests of setting the lowest ID to
store in a log store. Also for synchronizing such settings.
Added:
ace/trunk/org.apache.ace.feedback.common/src/org/apache/ace/feedback/LowestID.java
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/LogSync.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/servlet/LogServlet.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/task/LogTaskTest.java
Added:
ace/trunk/org.apache.ace.feedback.common/src/org/apache/ace/feedback/LowestID.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.feedback.common/src/org/apache/ace/feedback/LowestID.java?rev=1635133&view=auto
==============================================================================
---
ace/trunk/org.apache.ace.feedback.common/src/org/apache/ace/feedback/LowestID.java
(added)
+++
ace/trunk/org.apache.ace.feedback.common/src/org/apache/ace/feedback/LowestID.java
Wed Oct 29 13:53:39 2014
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.feedback;
+
+import java.util.NoSuchElementException;
+import java.util.StringTokenizer;
+
+import org.apache.ace.feedback.util.Codec;
+
+/**
+ * Instances of this class represent a lowest ID for a specific target and
store ID.
+ */
+public class LowestID {
+ private final String m_targetID;
+ private final long m_storeID;
+ private final long m_lowestID;
+
+ public LowestID(String targetID, long storeID, long lowestID) {
+ m_targetID = targetID;
+ m_storeID = storeID;
+ m_lowestID = lowestID;
+ }
+
+ public LowestID(String representation) {
+ try {
+ StringTokenizer st = new StringTokenizer(representation, ",");
+ m_targetID = Codec.decode(st.nextToken());
+ m_storeID = Long.parseLong(st.nextToken());
+ m_lowestID = Long.parseLong(st.nextToken());
+ }
+ catch (NoSuchElementException e) {
+ throw new IllegalArgumentException("Could not create lowest ID
object from: " + representation);
+ }
+ }
+
+ public String getTargetID() {
+ return m_targetID;
+ }
+
+ public long getStoreID() {
+ return m_storeID;
+ }
+
+ public long getLowestID() {
+ return m_lowestID;
+ }
+
+ public String toRepresentation() {
+ StringBuffer result = new StringBuffer();
+ result.append(Codec.encode(m_targetID));
+ result.append(',');
+ result.append(m_storeID);
+ result.append(',');
+ result.append(m_lowestID);
+ return result.toString();
+ }
+}
Modified: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/LogSync.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/LogSync.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/LogSync.java (original)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/LogSync.java Wed Oct 29
13:53:39 2014
@@ -53,6 +53,13 @@ public interface LogSync
* <code>false</code> otherwise.
*/
public boolean pushpull() throws IOException;
+
+ /** Pushes lowest IDs to remote repository. */
+ public boolean pushIDs() throws IOException;
+ /** Pulls lowest IDs from remote repository. */
+ public boolean pullIDs() throws IOException;
+ /** Pushes and pulls lowest IDs to/from remote repository. */
+ public boolean pushpullIDs() throws IOException;
/**
* Returns the name of the log 'channel' this log sync task is assigned to.
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/servlet/LogServlet.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/servlet/LogServlet.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/servlet/LogServlet.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/servlet/LogServlet.java
Wed Oct 29 13:53:39 2014
@@ -36,6 +36,7 @@ import javax.servlet.http.HttpServletRes
import org.apache.ace.authentication.api.AuthenticationService;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
+import org.apache.ace.feedback.LowestID;
import org.apache.ace.log.server.store.LogStore;
import org.apache.ace.range.SortedRangeSet;
import org.osgi.service.log.LogService;
@@ -57,6 +58,10 @@ import org.osgi.service.useradmin.User;
* http://host:port/auditlog/receive - Return all known events
* http://host:port/auditlog/receive?tid=myid - Return all known events
belonging to the specified target ID
* http://host:port/auditlog/receive?tid=myid&logid=2374623874 - Return all
known events belonging to the specified target ID
+ *
+ * Similarly, you can also send/receive lowest IDs for the logs:
+ * http://host:port/auditlog/sendids
+ * http://host:port/auditlog/receiveids
*
* If the request is not correctly formatted or other problems arise error
code <code>HttpServletResponse.SC_NOT_FOUND</code> will be sent in the response.
*/
@@ -71,6 +76,8 @@ public class LogServlet extends HttpServ
private static final String QUERY = "/query";
private static final String SEND = "/send";
private static final String RECEIVE = "/receive";
+ private static final String SEND_IDS = "/sendids";
+ private static final String RECEIVE_IDS = "/receiveids";
// url parameter keys
private static final String TARGETID_KEY = "tid";
@@ -100,16 +107,17 @@ public class LogServlet extends HttpServ
if (SEND.equals(path) && !handleSend(request.getInputStream())) {
sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Could
not construct a log event for all events received");
}
+ else if (SEND_IDS.equals(path) &&
!handleSendIDs(request.getInputStream())) {
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Could
not set lowest IDs for all logs received");
+ }
}
catch (IOException e) {
- sendError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Error processing received log events");
+ sendError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Error processing post request");
}
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse
response) {
- // 'query' and 'receive' calls are GET calls
-
String path = request.getPathInfo();
String targetID = request.getParameter(TARGETID_KEY);
String logID = request.getParameter(LOGID_KEY);
@@ -126,11 +134,14 @@ public class LogServlet extends HttpServ
sendError(response, HttpServletResponse.SC_BAD_REQUEST,
"Unable to interpret query");
}
else if (RECEIVE.equals(path) && !handleReceive(targetID, logID,
range, filter, output)) {
- sendError(response, HttpServletResponse.SC_BAD_REQUEST,
"Unable to interpret receive query");
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST,
"Unable to interpret receive request");
+ }
+ else if (RECEIVE_IDS.equals(path) && !handleReceiveIDs(targetID,
logID, filter, output)) {
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST,
"Unable to interpret receiveids request");
}
}
catch (IOException e) {
- sendError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Unable to process query");
+ sendError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Unable to process request", e);
}
finally {
try {
@@ -144,9 +155,6 @@ public class LogServlet extends HttpServ
}
}
- /**
- * {@inheritDoc}
- */
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (!authenticate(req)) {
@@ -269,6 +277,73 @@ public class LogServlet extends HttpServ
m_store.put(events);
return success;
}
+
+ // Handle a call to the send IDs 'command'
+ protected boolean handleSendIDs(ServletInputStream input) throws
IOException {
+ boolean success = true;
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(input));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ try {
+ LowestID lid = new LowestID(line);
+ m_log.log(LogService.LOG_DEBUG, "Lowest ID event
received: '" + line +"'");
+ m_store.setLowestID(lid.getTargetID(),
lid.getStoreID(), lid.getLowestID());
+ }
+ catch (IllegalArgumentException iae) {
+ success = false;
+ m_log.log(LogService.LOG_WARNING, "Could not construct
lowest ID from string: '" + line + "'");
+ }
+ }
+ }
+ finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ return success;
+ }
+
+ // Handle a call to the receive 'command'
+ protected boolean handleReceiveIDs(String targetID, String logID, String
filter, ServletOutputStream output) throws IOException {
+ if ((targetID != null) && (logID != null)) {
+ // target and log id are specified, return only the lowest ID that
matches these id's
+ long logid = Long.parseLong(logID);
+ outputLowestID(targetID, logid, output);
+ return true;
+ }
+ else if ((targetID != null) && (logID == null)) {
+ // target id is specified, log id is not, return all events that
belong to the specified target id
+ List<Descriptor> descriptors = m_store.getDescriptors(targetID);
+ for (Descriptor descriptor : descriptors) {
+ outputLowestID(targetID, descriptor.getStoreID(), output);
+ }
+ return true;
+ }
+ else if ((targetID == null) && (logID == null)) {
+ // no target or log id has been specified, return all events
+ List<Descriptor> descriptors = m_store.getDescriptors();
+ for (Descriptor descriptor : descriptors) {
+ outputLowestID(descriptor.getTargetID(),
descriptor.getStoreID(), output);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void outputLowestID(String targetID, long logID,
ServletOutputStream output) throws IOException {
+ long lowestID = m_store.getLowestID(targetID, logID);
+ if (lowestID > 0) {
+ LowestID lid = new LowestID(targetID, logID, lowestID);
+ output.print(lid.toRepresentation() + "\n");
+ }
+ }
// print string representations of all events in the specified range to
the specified output
private void outputRange(ServletOutputStream output, Descriptor range)
throws IOException {
@@ -280,7 +355,16 @@ public class LogServlet extends HttpServ
// send an error response
private void sendError(HttpServletResponse response, int statusCode,
String description) {
- m_log.log(LogService.LOG_WARNING, "Log request failed: " +
description);
+ sendError(response, statusCode, description, null);
+ }
+
+ private void sendError(HttpServletResponse response, int statusCode,
String description, Throwable t) {
+ if (t == null) {
+ m_log.log(LogService.LOG_WARNING, "Log request failed: " +
description);
+ }
+ else {
+ m_log.log(LogService.LOG_WARNING, "Log request failed: " +
description, t);
+ }
try {
response.sendError(statusCode, description);
}
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/LogStore.java
Wed Oct 29 13:53:39 2014
@@ -113,4 +113,7 @@ public interface LogStore
* @throws java.io.IOException in case of any IO error.
*/
public Event put(String targetID, int type, Dictionary props) throws
IOException;
+
+ public void setLowestID(String targetID, long logID, long lowestID) throws
IOException;
+ public long getLowestID(String targetID, long logID) throws IOException;
}
\ No newline at end of file
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
Wed Oct 29 13:53:39 2014
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
@@ -62,7 +63,8 @@ public class LogStoreImpl implements Log
private int m_maxEvents = 0;
private final ConcurrentMap<String, Set<Long>> m_locks = new
ConcurrentHashMap<String, Set<Long>>();
- private final Map<String, Long> m_fileToID = new HashMap<String, Long>();
+ private final Map<String, Long> m_fileToHighestID = new HashMap<String,
Long>();
+ private final Map<String, Long> m_fileToLowestID = new HashMap<String,
Long>();
public LogStoreImpl(File baseDir, String name) {
m_name = name;
@@ -103,7 +105,7 @@ public class LogStoreImpl implements Log
final SortedRangeSet set = descriptor.getRangeSet();
BufferedReader in = null;
try {
- File log = new File(new File(m_dir,
targetIDToFilename(descriptor.getTargetID())),
String.valueOf(descriptor.getStoreID()));
+ File log = getLogFile(descriptor.getTargetID(),
descriptor.getStoreID());
if (!log.isFile()) {
return result;
}
@@ -119,15 +121,15 @@ public class LogStoreImpl implements Log
else {
counter = -1;
}
- if (set.contains(id)) {
+ if (set.contains(id) && id >=
getLowestIDInternal(descriptor.getTargetID(), descriptor.getStoreID())) {
result.add(event);
}
}
if (counter < 1) {
- m_fileToID.remove(file);
+ m_fileToHighestID.remove(file);
}
else {
- m_fileToID.put(file, counter);
+ m_fileToHighestID.put(file, counter);
}
}
finally {
@@ -152,7 +154,7 @@ public class LogStoreImpl implements Log
}
private Descriptor getDescriptorInternal(String targetID, long logID,
boolean lock) throws IOException {
- Long high = m_fileToID.get(new File(new File(m_dir,
targetIDToFilename(targetID)), String.valueOf(logID)).getAbsolutePath());
+ Long high = m_fileToHighestID.get(getLogFile(targetID,
logID).getAbsolutePath());
if (high != null) {
Range r = new Range(1, high);
return new Descriptor(targetID, logID, new
SortedRangeSet(r.toRepresentation()));
@@ -169,20 +171,20 @@ public class LogStoreImpl implements Log
}
public List<Descriptor> getDescriptors(String targetID) throws IOException
{
- File dir = new File(m_dir, targetIDToFilename(targetID));
+ File dir = getTargetDirectory(targetID);
List<Descriptor> result = new ArrayList<Descriptor>();
if (!dir.isDirectory()) {
return result;
}
- for (String name : notNull(dir.list())) {
+ for (String name : notNull(dir.list(LOGID_FILENAME_FILTER))) {
result.add(getDescriptor(targetID, Long.parseLong(name)));
}
return result;
}
- public List<Descriptor> getDescriptors() throws IOException {
+ public List<Descriptor> getDescriptors() throws IOException {
List<Descriptor> result = new ArrayList<Descriptor>();
for (String name : notNull(m_dir.list())) {
result.addAll(getDescriptors(filenameToTargetID(name)));
@@ -227,8 +229,8 @@ public class LogStoreImpl implements Log
// 1. we can append events at the end of the existing file
// 2. we need to insert events in the existing file (meaning we have to
// rewrite basically the whole file)
- String file = new File(new File(m_dir, targetIDToFilename(targetID)),
String.valueOf(logID)).getAbsolutePath();
- Long highest = m_fileToID.get(file);
+ String file = getLogFile(targetID, logID).getAbsolutePath();
+ Long highest = m_fileToHighestID.get(file);
boolean cached = false;
if (highest != null) {
if (highest.longValue() + 1 == list.get(0).getID()) {
@@ -255,7 +257,7 @@ public class LogStoreImpl implements Log
PrintWriter out = null;
try {
- File dir = new File(m_dir, targetIDToFilename(targetID));
+ File dir = getTargetDirectory(targetID);
if (!dir.isDirectory() && !dir.mkdirs()) {
throw new IOException("Unable to create backup store.");
}
@@ -291,10 +293,10 @@ public class LogStoreImpl implements Log
m_eventAdmin.postEvent(new
org.osgi.service.event.Event(LogStore.EVENT_TOPIC, props));
}
if ((cached) && (high < Long.MAX_VALUE)) {
- m_fileToID.put(file, new Long(high));
+ m_fileToHighestID.put(file, new Long(high));
}
else {
- m_fileToID.remove(file);
+ m_fileToHighestID.remove(file);
}
}
finally {
@@ -307,6 +309,27 @@ public class LogStoreImpl implements Log
}
}
+ private void createTargetDirectory(String targetID) throws IOException {
+ File directory = getTargetDirectory(targetID);
+ if (!directory.isDirectory()) {
+ if (!directory.mkdirs()) {
+ throw new IOException("Could not create
directory: " + directory.getAbsolutePath());
+ }
+ }
+ }
+
+ private File getTargetDirectory(String targetID) {
+ return new File(m_dir, targetIDToFilename(targetID));
+ }
+
+ private File getLogFile(String targetID, Long logID) {
+ return new File(getTargetDirectory(targetID),
String.valueOf(logID));
+ }
+
+ private File getLogFileIndex(String targetID, Long logID) {
+ return new File(getTargetDirectory(targetID),
String.valueOf(logID) + ".index");
+ }
+
/**
* Sort the given list of events into a map of maps according to the
targetID and the logID of each event.
*
@@ -528,4 +551,87 @@ public class LogStoreImpl implements Log
releaseLock(targetID, storeID);
}
}
+
+ @Override
+ public void setLowestID(String targetID, long logID, long lowestID) throws
IOException {
+ obtainLock(targetID, logID);
+ try {
+ long currentID = getLowestIDInternal(targetID, logID);
+ if (currentID < lowestID) {
+ FileWriter fw = null;
+ try {
+ createTargetDirectory(targetID);
+ File index = getLogFileIndex(targetID, logID);
+ fw = new FileWriter(index);
+ fw.write(Long.toString(lowestID));
+ m_fileToLowestID.put(index.getAbsolutePath(),
lowestID);
+ }
+ finally {
+ if (fw != null) {
+ try {
+ fw.close();
+ }
+ catch (IOException ioe) {}
+ }
+ }
+ }
+ }
+ finally {
+ releaseLock(targetID, logID);
+ }
+ }
+
+ public long getLowestID(String targetID, long logID) throws IOException {
+ obtainLock(targetID, logID);
+ try {
+ return getLowestIDInternal(targetID, logID);
+ }
+ finally {
+ releaseLock(targetID, logID);
+ }
+ }
+
+ private long getLowestIDInternal(String targetID, long logID) {
+ File index = getLogFileIndex(targetID, logID);
+ Long result = m_fileToLowestID.get(index.getAbsolutePath());
+ if (result == null) {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(index));
+ String line = br.readLine();
+ br.close();
+ result = Long.parseLong(line);
+ m_fileToLowestID.put(index.getAbsolutePath(), result);
+ }
+ catch (Exception nfe) {
+ // if the file somehow got corrupted, or does not exist,
+ // we simply assume 0 as the default
+ m_fileToLowestID.put(index.getAbsolutePath(), 0L);
+ return 0L;
+ }
+ finally {
+ if (br != null) {
+ try {
+ br.close();
+ }
+ catch (IOException e) {}
+ }
+ }
+ }
+ return result;
+ }
+
+ private static FilenameFilter LOGID_FILENAME_FILTER = new
LogIDFilenameFilter();
+ private static class LogIDFilenameFilter implements FilenameFilter {
+ @Override
+ public boolean accept(File dir, String name) {
+ try {
+ Long.parseLong(name);
+ return true;
+ }
+ catch (NumberFormatException nfe) {
+ return false;
+ }
+ }
+ }
}
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
Wed Oct 29 13:53:39 2014
@@ -193,4 +193,16 @@ public class MongoLogStore implements Lo
// TODO add an event to the appropriate store
return null;
}
+
+ @Override
+ public void setLowestID(String targetID, long logID, long lowestID)
throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getLowestID(String targetID, long logID) throws IOException
{
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java
Wed Oct 29 13:53:39 2014
@@ -40,6 +40,7 @@ import org.osgi.service.log.LogService;
public class Activator extends DependencyActivatorBase implements
ManagedServiceFactory {
private static final String KEY_LOG_NAME = "name";
private static final String KEY_MODE = "mode";
+ private static final String KEY_MODE_LOWEST_IDS = "mode-lowest-ids";
private static final String KEY_TARGETID = "tid";
private final Map<String, Component> m_instances = new HashMap<String,
Component>();
@@ -69,13 +70,27 @@ public class Activator extends Dependenc
if ((name == null) || "".equals(name)) {
throw new ConfigurationException(KEY_LOG_NAME, "Log name has to be
specified.");
}
- Mode mode = Mode.PUSH;
+ Mode dataTransferMode = Mode.PUSH;
String modeValue = (String) dict.get(KEY_MODE);
if ("pull".equals(modeValue)) {
- mode = Mode.PULL;
+ dataTransferMode = Mode.PULL;
}
else if ("pushpull".equals(modeValue)) {
- mode = Mode.PUSHPULL;
+ dataTransferMode = Mode.PUSHPULL;
+ }
+ else if ("none".equals(modeValue)) {
+ dataTransferMode = Mode.NONE;
+ }
+ Mode lowestIDsMode = Mode.NONE;
+ modeValue = (String) dict.get(KEY_MODE_LOWEST_IDS);
+ if ("pull".equals(modeValue)) {
+ lowestIDsMode = Mode.PULL;
+ }
+ else if ("pushpull".equals(modeValue)) {
+ lowestIDsMode = Mode.PUSHPULL;
+ }
+ else if ("push".equals(modeValue)) {
+ lowestIDsMode = Mode.PUSH;
}
String targetID = (String) dict.get(KEY_TARGETID);
@@ -84,9 +99,9 @@ public class Activator extends Dependenc
Properties props = new Properties();
props.put(KEY_LOG_NAME, name);
props.put("taskName", LogSyncTask.class.getName());
- props.put("description", "Syncs log (name=" + name + ", mode=" +
mode.toString() + (targetID == null ? "" : ", targetID=" + targetID) + ") with
a server.");
+ props.put("description", "Syncs log (name=" + name + ", mode=" +
dataTransferMode.toString() + (targetID == null ? "" : ", targetID=" +
targetID) + ") with a server.");
String filter = "(&(" + Constants.OBJECTCLASS + "=" +
LogStore.class.getName() + ")(name=" + name + "))";
- LogSyncTask service = new LogSyncTask(name, name, mode, targetID);
+ LogSyncTask service = new LogSyncTask(name, name, dataTransferMode,
lowestIDsMode, targetID);
newComponent = m_manager.createComponent()
.setInterface(new String[] { Runnable.class.getName(),
LogSync.class.getName() }, props)
.setImplementation(service)
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
(original)
+++
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
Wed Oct 29 13:53:39 2014
@@ -41,8 +41,8 @@ import org.apache.ace.connectionfactory.
import org.apache.ace.discovery.Discovery;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
+import org.apache.ace.feedback.LowestID;
import org.apache.ace.log.LogSync;
-import org.apache.ace.log.server.servlet.LogServlet;
import org.apache.ace.log.server.store.LogStore;
import org.apache.ace.range.SortedRangeSet;
import org.osgi.service.log.LogService;
@@ -50,13 +50,14 @@ import org.osgi.service.log.LogService;
public class LogSyncTask implements Runnable, LogSync {
public static enum Mode {
- PUSH, PULL, PUSHPULL
+ NONE, PUSH, PULL, PUSHPULL
}
private static final String COMMAND_QUERY = "query";
private static final String COMMAND_SEND = "send";
-
+ private static final String COMMAND_SEND_IDS = "sendids";
private static final String COMMAND_RECEIVE = "receive";
+ private static final String COMMAND_RECEIVE_IDS = "receiveids";
private static final String TARGETID_KEY = "tid";
@SuppressWarnings("unused")
private static final String FILTER_KEY = "filter";
@@ -72,16 +73,18 @@ public class LogSyncTask implements Runn
private final String m_endpoint;
private final String m_name;
private final String m_targetID;
- private final Mode m_mode;
+ private final Mode m_dataTransferMode;
+ private final Mode m_lowestIDMode;
- public LogSyncTask(String endpoint, String name, Mode mode) {
- this(endpoint, name, mode, null);
+ public LogSyncTask(String endpoint, String name, Mode dataTransferMode,
Mode lowestIDMode) {
+ this(endpoint, name, dataTransferMode, lowestIDMode, null);
}
- public LogSyncTask(String endpoint, String name, Mode mode, String
targetID) {
+ public LogSyncTask(String endpoint, String name, Mode dataTransferMode,
Mode lowestIDMode, String targetID) {
m_endpoint = endpoint;
m_name = name;
- m_mode = mode;
+ m_dataTransferMode = dataTransferMode;
+ m_lowestIDMode = lowestIDMode;
m_targetID = targetID;
}
@@ -100,10 +103,49 @@ public class LogSyncTask implements Runn
public boolean pushpull() throws IOException {
return synchronize(true /* push */, true /* pull */);
}
+
+ public boolean pullIDs() throws IOException {
+ return synchronizeLowestIDs(false, true);
+ }
+
+ public boolean pushIDs() throws IOException {
+ return synchronizeLowestIDs(true, false);
+ }
+
+ public boolean pushpullIDs() throws IOException {
+ return synchronizeLowestIDs(true, true);
+ }
public void run() {
try {
- switch (m_mode) {
+ switch (m_lowestIDMode) {
+ case NONE:
+ break;
+ case PULL:
+ pullIDs();
+ break;
+ case PUSH:
+ pushIDs();
+ break;
+ case PUSHPULL:
+ pushpullIDs();
+ break;
+ }
+ }
+ catch (MalformedURLException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_lowestIDMode +
") synchronize IDs (name=" + m_name + ") with remote (malformed URL, incorrect
configuration?)", e);
+ }
+ catch (ConnectException e) {
+ m_log.log(LogService.LOG_WARNING, "Unable to (" + m_lowestIDMode +
") synchronize IDs (name=" + m_name + ") with remote (connection refused,
remote not up?)", e);
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_lowestIDMode +
") synchronize IDs (name=" + m_name + ") with remote", e);
+ }
+
+ try {
+ switch (m_dataTransferMode) {
+ case NONE:
+ break;
case PULL:
pull();
break;
@@ -116,13 +158,13 @@ public class LogSyncTask implements Runn
}
}
catch (MalformedURLException e) {
- m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString()
+ ") synchronize log (name=" + m_name + ") with remote (malformed URL,
incorrect configuration?)");
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_dataTransferMode
+ ") synchronize log (name=" + m_name + ") with remote (malformed URL,
incorrect configuration?)", e);
}
catch (ConnectException e) {
- m_log.log(LogService.LOG_WARNING, "Unable to (" +
m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote
(connection refused, remote not up?)");
+ m_log.log(LogService.LOG_WARNING, "Unable to (" +
m_dataTransferMode + ") synchronize log (name=" + m_name + ") with remote
(connection refused, remote not up?)", e);
}
catch (IOException e) {
- m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString()
+ ") synchronize log (name=" + m_name + ") with remote", e);
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_dataTransferMode
+ ") synchronize log (name=" + m_name + ") with remote", e);
}
}
@@ -402,4 +444,119 @@ public class LogSyncTask implements Runn
return result;
}
+
+ private boolean synchronizeLowestIDs(boolean push, boolean pull) throws
IOException {
+ URL host = m_discovery.discover();
+
+ boolean result = false;
+ if (push) {
+ result |= doPushLowestIDs(host);
+ }
+ if (pull) {
+ result |= doPullLowestIDs(host);
+ }
+
+ return result;
+ }
+
+ protected boolean doPushLowestIDs(URL host) {
+ boolean result = false;
+ OutputStream sendOutput = null;
+ HttpURLConnection sendConnection = null;
+
+ try {
+ sendConnection = createConnection(createURL(host,
COMMAND_SEND_IDS));
+ // ACE-294: enable streaming mode causing only small amounts of
memory to be
+ // used for this commit. Otherwise, the entire input stream is
cached into
+ // memory prior to sending it to the server...
+ sendConnection.setChunkedStreamingMode(8192);
+ sendConnection.setDoOutput(true);
+ sendOutput = sendConnection.getOutputStream();
+ BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(sendOutput));
+ try {
+ for (Descriptor d : (m_targetID == null ?
m_logStore.getDescriptors() : m_logStore.getDescriptors(m_targetID))) {
+ long lowestID = m_logStore.getLowestID(d.getTargetID(),
d.getStoreID());
+ if (lowestID > 0) {
+ LowestID lid = new LowestID(d.getTargetID(),
d.getStoreID(), lowestID);
+ writer.write(lid.toRepresentation() + "\n");
+ }
+ }
+ }
+ finally {
+ writer.close();
+ }
+
+ // Will cause a flush and reads the response from the server...
+ int rc = sendConnection.getResponseCode();
+ result = (rc == HttpServletResponse.SC_OK);
+
+ if (!result) {
+ String msg = sendConnection.getResponseMessage();
+ m_log.log(LogService.LOG_WARNING, String.format("Could not
push lowest IDs '%s'. Server response: %s (%d)", m_name, msg, rc));
+ }
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (fully) push lowest IDs
with remote", e);
+ }
+ finally {
+ closeSilently(sendOutput);
+ closeSilently(sendConnection);
+ }
+ if (result) {
+ m_log.log(LogService.LOG_DEBUG, "Pushed lowest IDs (" + m_name +
") successfully to remote...");
+ }
+ return result;
+ }
+
+ protected boolean doPullLowestIDs(URL host) {
+ boolean result = false;
+ InputStream receiveInput = null;
+ HttpURLConnection receiveConnection = null;
+ try {
+ URL url = createURL(host, COMMAND_RECEIVE_IDS);
+
+ receiveConnection = createConnection(url);
+ receiveInput = receiveConnection.getInputStream();
+
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(receiveInput));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ try {
+ LowestID lid = new LowestID(line);
+ m_logStore.setLowestID(lid.getTargetID(),
lid.getStoreID(), lid.getLowestID());
+ }
+ catch (IllegalArgumentException e) {
+ // Just skip this one.
+ m_log.log(LogService.LOG_WARNING, "Could not parse
incoming line: " + line + " because: " + e.getMessage(), e);
+ }
+ }
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_DEBUG, "Error reading line from
reader", e);
+ }
+ finally {
+ reader.close();
+ }
+
+ int rc = receiveConnection.getResponseCode();
+ result = (rc == HttpServletResponse.SC_OK);
+
+ if (!result) {
+ String msg = receiveConnection.getResponseMessage();
+ m_log.log(LogService.LOG_WARNING, String.format("Could not
receive lowest IDs '%s'. Server response: %s (%d)", m_name, msg, rc));
+ }
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to connect to receive
lowest IDs.", e);
+ }
+ finally {
+ closeSilently(receiveInput);
+ closeSilently(receiveConnection);
+ }
+ if (result) {
+ m_log.log(LogService.LOG_DEBUG, "Pulled lowest IDs (" + m_name +
") successfully from remote...");
+ }
+ return result;
+ }
}
Modified:
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
(original)
+++
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
Wed Oct 29 13:53:39 2014
@@ -22,10 +22,11 @@ import static org.apache.ace.test.utils.
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Dictionary;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
@@ -83,6 +84,36 @@ public class LogServletTest {
assert result;
assert (m_range.toRepresentation() + "\n").equals(output.m_text);
}
+
+ @Test(groups = { UNIT })
+ public void receiveLowestID() throws Exception {
+ // no lowest ID set
+ MockServletOutputStream output = new MockServletOutputStream();
+ boolean result = m_logServlet.handleReceiveIDs(m_range.getTargetID(),
String.valueOf(m_range.getStoreID()), null, output);
+ assert result;
+ String expected = "";
+ String actual = output.m_text;
+ assert expected.equals(actual) : "We expected '" + expected + "', but
received '" + actual + "'";
+
+ // set lowest ID
+ m_mockStore.setLowestID(m_range.getTargetID(), m_range.getStoreID(),
5);
+ output = new MockServletOutputStream();
+ result = m_logServlet.handleReceiveIDs(m_range.getTargetID(),
String.valueOf(m_range.getStoreID()), null, output);
+ assert result;
+ expected = m_range.getTargetID() + "," + m_range.getStoreID() + ",5\n";
+ actual = output.m_text;
+ assert expected.equals(actual) : "We expected '" + expected + "', but
received '" + actual + "'";
+ }
+
+ @Test(groups = { UNIT })
+ public void sendLowestID() throws Exception {
+ MockServletInputStream input = new MockServletInputStream();
+ String expected = m_range.getTargetID() + "," + m_range.getStoreID() +
",9\n";
+ input.setBytes(expected.getBytes());
+ m_logServlet.handleSendIDs(input);
+ long lowestID = m_mockStore.getLowestID(m_range.getTargetID(),
m_range.getStoreID());
+ assert 9 == lowestID : "Expected lowest ID to be 9, but got: "
+ lowestID;
+ }
@Test(groups = { UNIT })
public void receiveLog() throws Exception {
@@ -116,6 +147,46 @@ public class LogServletTest {
assert expected.equals(actual);
}
+ private static class Tuple {
+ private final String m_targetID;
+ private final long m_logID;
+ public Tuple(String targetID, long logID) {
+ if (targetID == null) {
+ throw new IllegalArgumentException("TargetID cannot be
null");
+ }
+ m_targetID = targetID;
+ m_logID = logID;
+ }
+ public String getTargetID() {
+ return m_targetID;
+ }
+ public long getLogID() {
+ return m_logID;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (m_logID ^ (m_logID >>>
32));
+ result = prime * result + m_targetID.hashCode();
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Tuple other = (Tuple) obj;
+ return (m_logID == other.getLogID() &&
m_targetID.equals(other.getTargetID()));
+ }
+ }
+
private class MockLogStore implements LogStore {
public List<Event> m_events = new ArrayList<Event>();
@@ -152,6 +223,17 @@ public class LogServletTest {
public Event put(String targetID, int type, Dictionary props) throws
IOException {
throw new UnsupportedOperationException("not implemented");
}
+ private Map<Tuple, Long> m_lowestIDs = new HashMap<>();
+
+ @Override
+ public void setLowestID(String targetID, long logID, long
lowestID) throws IOException {
+ m_lowestIDs.put(new Tuple(targetID, logID), lowestID);
+ }
+ @Override
+ public long getLowestID(String targetID, long logID) throws
IOException {
+ Long result = m_lowestIDs.get(new Tuple(targetID,
logID));
+ return result == null ? 0 : result.longValue();
+ }
}
private class MockServletOutputStream extends ServletOutputStream {
Modified:
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
(original)
+++
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Wed Oct 29 13:53:39 2014
@@ -82,12 +82,7 @@ public class ServerLogStoreTester {
}
m_logStore.put(events);
assert m_logStore.getDescriptors().size() == 3 * 4 : "Incorrect amount
of ranges returned from store";
- List<Event> stored = new ArrayList<Event>();
- for (Descriptor range : m_logStore.getDescriptors()) {
- for (Descriptor range2 :
m_logStore.getDescriptors(range.getTargetID())) {
-
stored.addAll(m_logStore.get(m_logStore.getDescriptor(range2.getTargetID(),
range2.getStoreID())));
- }
- }
+ List<Event> stored = getStoredEvents();
Set<String> in = new HashSet<String>();
for (Event event : events) {
@@ -100,6 +95,52 @@ public class ServerLogStoreTester {
assert in.equals(out) : "Stored events differ from the added.";
}
+
+ @SuppressWarnings("serial")
+ @Test(groups = { UNIT })
+ public void testLogLowestID() throws IOException {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put("test", "bar");
+
+ List<Descriptor> ranges = m_logStore.getDescriptors();
+ assert ranges.isEmpty() : "New store should have no ranges.";
+ List<Event> events = new ArrayList<Event>();
+
+ assert 0 == m_logStore.getLowestID("target", 1) : "Lowest ID should be
0 by default, not: " + m_logStore.getLowestID("target", 1);
+ m_logStore.setLowestID("target", 1, 10);
+ assert 10 == m_logStore.getLowestID("target", 1) : "Lowest ID should
be 10, not: " + m_logStore.getLowestID("target", 1);
+ assert 0 == m_logStore.getLowestID("target", 0) : "Lowest ID should be
0 by default, not: " + m_logStore.getLowestID("target", 1);
+ assert 0 == m_logStore.getLowestID("target2", 1) : "Lowest ID should
be 0 by default, not: " + m_logStore.getLowestID("target", 1);
+
+ for (long id = 0; id < 20; id++) {
+ events.add(new Event("target", 1, id, System.currentTimeMillis(),
AuditEvent.FRAMEWORK_STARTED, props));
+ }
+ m_logStore.put(events);
+ assert m_logStore.getDescriptors().size() == 1 : "Incorrect amount of
ranges returned from store";
+ List<Event> stored = getStoredEvents();
+ assert stored.size() == 10 : "Exactly 10 events should have been
stored";
+ m_logStore.setLowestID("target", 1, 19);
+ stored = getStoredEvents();
+ assert stored.size() == 1 : "Exactly 1 event should have been stored";
+ m_logStore.setLowestID("target", 1, 20);
+ stored = getStoredEvents();
+ assert stored.size() == 0 : "No events should have been stored";
+ m_logStore.setLowestID("target", 1, 100);
+ stored = getStoredEvents();
+ assert stored.size() == 0 : "No events should have been stored";
+ }
+
+ private List<Event> getStoredEvents() throws IOException {
+ List<Event> stored = new ArrayList<Event>();
+ for (Descriptor range : m_logStore.getDescriptors()) {
+ for (Descriptor range2 :
m_logStore.getDescriptors(range.getTargetID())) {
+
stored.addAll(m_logStore.get(m_logStore.getDescriptor(range2.getTargetID(),
range2.getStoreID())));
+ }
+ }
+ return stored;
+ }
+
+
@Test(groups = { UNIT })
public void testCreateLogMessagesConcurrently() throws Exception {
final Properties props = new Properties();
@@ -123,12 +164,7 @@ public class ServerLogStoreTester {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
assert m_logStore.getDescriptors().size() == 10 : "Incorrect amount of
ranges returned from store: " + m_logStore.getDescriptors().size();
- List<Event> stored = new ArrayList<Event>();
- for (Descriptor range : m_logStore.getDescriptors()) {
- for (Descriptor range2 :
m_logStore.getDescriptors(range.getTargetID())) {
-
stored.addAll(m_logStore.get(m_logStore.getDescriptor(range2.getTargetID(),
range2.getStoreID())));
- }
- }
+ List<Event> stored = getStoredEvents();
assert stored.size() == 10000 : "Incorrect number of events got
stored: " + stored.size();
}
@@ -270,12 +306,7 @@ public class ServerLogStoreTester {
es.awaitTermination(60, TimeUnit.SECONDS);
int size = m_logStore.getDescriptors().size();
assert size == 3 * 4 : "Incorrect amount of ranges returned from
store: " + size;
- List<Event> stored = new ArrayList<Event>();
- for (Descriptor range : m_logStore.getDescriptors()) {
- for (Descriptor range2 :
m_logStore.getDescriptors(range.getTargetID())) {
-
stored.addAll(m_logStore.get(m_logStore.getDescriptor(range2.getTargetID(),
range2.getStoreID())));
- }
- }
+ List<Event> stored = getStoredEvents();
Set<String> out = new HashSet<String>();
for (Event event : stored) {
Modified:
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/task/LogTaskTest.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/task/LogTaskTest.java?rev=1635133&r1=1635132&r2=1635133&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/task/LogTaskTest.java
(original)
+++
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/task/LogTaskTest.java
Wed Oct 29 13:53:39 2014
@@ -39,7 +39,7 @@ public class LogTaskTest {
public List<Descriptor> m_calledWith = new ArrayList<Descriptor>();
public MockLogSyncTask(String endpoint, String name) {
- super(endpoint, name, LogSyncTask.Mode.PUSH);
+ super(endpoint, name, LogSyncTask.Mode.PUSH,
LogSyncTask.Mode.NONE);
}
public void clear() {