Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java Tue May 7 10:25:13 2013 @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ace.agent.logging; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.util.Iterator; +import java.util.List; + +import org.apache.ace.connectionfactory.ConnectionFactory; +import org.apache.ace.discovery.Discovery; +import org.apache.ace.identification.Identification; +import org.apache.ace.log.LogDescriptor; +import org.apache.ace.log.LogEvent; +import org.apache.ace.log.target.store.LogStore; +import org.apache.ace.range.RangeIterator; +import org.apache.ace.range.SortedRangeSet; +import org.osgi.service.log.LogService; + +//FIXME This is a of the org.apache.ace.log it is private and may be better located here. + +// TODO there are two versions of this class around, the other ohne being the server.LogSyncTask, +// and both are fairly similar +public class LogSyncTask implements Runnable { + + private static final String COMMAND_QUERY = "query"; + private static final String COMMAND_SEND = "send"; + private static final String PARAMETER_TARGETID = "tid"; + private static final String PARAMETER_LOGID = "logid"; + + // injected by dependencymanager + private volatile Discovery m_discovery; + private volatile Identification m_identification; + private volatile LogService m_log; + private volatile LogStore m_LogStore; + private volatile ConnectionFactory m_connectionFactory; + + private final String m_endpoint; + + public LogSyncTask(String endpoint) { + m_endpoint = endpoint; + } + + /** + * Synchronize the log events available remote with the events available locally. + */ + public void run() { + URL host = m_discovery.discover(); + + if (host == null) { + // expected if there's no discovered + // ps or relay server + m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available"); + return; + } + + if ("file".equals(host.getProtocol())) { + // if the discovery URL is a file, we cannot sync, so we silently return here + return; + } + + String targetId = m_identification.getID(); + URLConnection sendConnection = null; + try { + sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND)); + sendConnection.setDoOutput(true); + if (sendConnection instanceof HttpURLConnection) { + // ACE-294: enable streaming mode causing only small amounts of memory to be + // used for this commit. Otherwise, the entire input stream is cached into + // memory prior to sending it to the server... + ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192); + } + + long[] logIDs = m_LogStore.getLogIDs(); + for (int i = 0; i < logIDs.length; i++) { + URL url = new URL(host, m_endpoint + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID + "=" + targetId + "&" + PARAMETER_LOGID + "=" + logIDs[i]); + + URLConnection queryConnection = m_connectionFactory.createConnection(url); + // TODO: make sure no actual call is made using sendConnection + // when there's nothing to sync + synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection); + } + + // Make sure to send the actual POST request... + sendConnection.getContent(); + } + catch (ConnectException e) { + m_log.log(LogService.LOG_WARNING, "Unable to connect to remote (endpoint=" + m_endpoint + ")"); + } + catch (IOException e) { + m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote (endpoint=" + m_endpoint + ")", e); + } + finally { + if (sendConnection instanceof HttpURLConnection) { + ((HttpURLConnection) sendConnection).disconnect(); + } + } + } + + /** + * Synchronizes a single log (there can be multiple log/logid's per target). + * + * @param logID + * ID of the log to synchronize. + * @param queryInput + * Stream pointing to a query result for the events available remotely for this log id + * @param sendConnection + * .getOutputStream() Stream to write the events to that are missing on the remote side. + * @throws java.io.IOException + * If synchronization could not be completed due to an I/O failure. + */ + protected void synchronizeLog(long logID, InputStream queryInput, URLConnection sendConnection) throws IOException { + long highestLocal = m_LogStore.getHighestID(logID); + if (highestLocal == 0) { + // No events, no need to synchronize + return; + } + + SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal); + SortedRangeSet remoteRange = getDescriptor(queryInput).getRangeSet(); + SortedRangeSet delta = remoteRange.diffDest(localRange); + RangeIterator rangeIterator = delta.iterator(); + + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream())); + + if (rangeIterator.hasNext()) { + long lowest = rangeIterator.next(); + long highest = delta.getHigh(); + if (lowest <= highest) { + List events = m_LogStore.get(logID, lowest, highestLocal > highest ? highest : highestLocal); + Iterator iter = events.iterator(); + while (iter.hasNext()) { + LogEvent current = (LogEvent) iter.next(); + while ((current.getID() > lowest) && rangeIterator.hasNext()) { + lowest = rangeIterator.next(); + } + if (current.getID() == lowest) { + // before we send the LogEvent to the other side, we fill out the + // appropriate identification + LogEvent event = new LogEvent(m_identification.getID(), current); + writer.write(event.toRepresentation() + "\n"); + } + } + } + } + + writer.flush(); + } + + /** + * Retrieves a LogDescriptor object from the specified stream. + * + * @param queryInput + * Stream containing a LogDescriptor object. + * @return LogDescriptor object reflecting the range contained in the stream. + * @throws java.io.IOException + * If no range could be determined due to an I/O failure. + */ + protected LogDescriptor getDescriptor(InputStream queryInput) throws IOException { + BufferedReader queryReader = null; + try { + queryReader = new BufferedReader(new InputStreamReader(queryInput)); + String rangeString = queryReader.readLine(); + if (rangeString != null) { + try { + return new LogDescriptor(rangeString); + } + catch (IllegalArgumentException iae) { + throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")"); + } + } + else { + throw new IOException("Could not construct LogDescriptor from stream because stream is empty"); + } + } + finally { + if (queryReader != null) { + try { + queryReader.close(); + } + catch (Exception ex) { + // not much we can do + } + } + } + } +}
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java Tue May 7 10:25:13 2013 @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ace.agent.logging; + +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.ace.agent.spi.ComponentFactoryBase; +import org.apache.ace.connectionfactory.ConnectionFactory; +import org.apache.ace.discovery.Discovery; +import org.apache.ace.identification.Identification; +import org.apache.ace.log.target.store.LogStore; +import org.apache.ace.scheduler.constants.SchedulerConstants; +import org.apache.felix.dm.Component; +import org.apache.felix.dm.DependencyManager; +import org.osgi.framework.BundleContext; +import org.osgi.service.log.LogService; + +/** + * Creates a executor whiteboard {@link Runnable} service components with a {@link LogSyncTask} implementation for every + * configured store unless explicitly disabled. + * + */ +public class LogSyncTaskFactory extends ComponentFactoryBase { + + @Override + public Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) { + + Set<Component> components = new HashSet<Component>(); + String value = configuration.get(LogFactory.LOG_STORES); + String[] stores = value.split(","); + for (String store : stores) { + + String sync = configuration.get(LogFactory.LOG_STORES + "." + store + ".sync"); + if (sync != null && sync.trim().toLowerCase().equals("false")) { + System.err.println("Disabled " + getAgentIdentifier(configuration) + "/" + store); + logService.log(LogService.LOG_DEBUG, "Log sync disabled for agent " + getAgentIdentifier(configuration) + "/" + store); + } + else { + components.add(createLogSyncComponent(context, manager, logService, configuration, store.trim())); + } + } + return components; + } + + private Component createLogSyncComponent(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration, String store) { + + String schedulerName = getAgentIdentifier(configuration) + "-" + store; + String description = "Task that synchronizes log store " + store + " for agent=" + getAgentIdentifier(configuration) + " on the target and server"; + + Properties props = getAgentproperties(configuration); + props.put(LogFactory.LOG_NAME, store); + + props.put(SchedulerConstants.SCHEDULER_NAME_KEY, LogSyncTask.class.getSimpleName()); + props.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, description); + props.put(SchedulerConstants.SCHEDULER_RECIPE, "2000"); + + Component component = manager.createComponent() + .setInterface(Runnable.class.getName(), props) + .setImplementation(new LogSyncTask(store)) + .add(manager.createServiceDependency() + .setService(ConnectionFactory.class, getAgentFilter(configuration, null)) + .setRequired(true)) + .add(manager.createServiceDependency() + .setService(LogStore.class, getAgentFilter(configuration, "(" + LogFactory.LOG_NAME + "=" + store + ")")) + .setRequired(true)) + .add(manager.createServiceDependency() + .setService(Discovery.class, getAgentFilter(configuration, null)) + .setRequired(true)) + .add(manager.createServiceDependency() + .setService(Identification.class, getAgentFilter(configuration, null)) + .setRequired(true)) + .add(manager.createServiceDependency() + .setService(LogService.class).setRequired(false)); + + return component; + } +} Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo Tue May 7 10:25:13 2013 @@ -0,0 +1 @@ +version 1.0 \ No newline at end of file Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java Tue May 7 10:25:13 2013 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ace.agent.spi; + +import java.util.Map; +import java.util.Set; + +import org.apache.felix.dm.Component; +import org.apache.felix.dm.DependencyManager; +import org.osgi.framework.BundleContext; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.log.LogService; + +/** + * SPI for Management Agent component factories. The create method is called for every individual agent configuration. + * The factory can return zero or more components and should not add them to the manager. Factories are create and + * disposed as required. must have a public default constructor, and are expected to be state-less and thread-safe. + * + */ +public interface ComponentFactory { + + /** + * Return zero or more service components for the specified agent configuartion. + * + * @param context + * The Bundle Context + * @param manager + * The Dependency Manager + * @param logService + * The Log Service + * @param configuration + * The agent configuration + * @return A set of components, not <code>null</code> + * @throws ConfigurationException + * If there is a fatal problem. + */ + Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException; +} Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java Tue May 7 10:25:13 2013 @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ace.agent.spi; + +import java.util.Map; +import java.util.Properties; + +/** + * Component factory base class that provides some convenience methods for concrete implementations. + */ +public abstract class ComponentFactoryBase implements ComponentFactory { + + /** + * Returns the agent identifier from a configuration. + * + * @param configuration + * The configuration + * @return The identifier + */ + protected String getAgentIdentifier(Map<String, String> configuration) { + return configuration.get("agent"); + } + + /** + * Returns mutable service properties with agent identifier pre-configured. + * + * @param configuration + * The configuration + * @return The properties + */ + protected Properties getAgentproperties(Map<String, String> configuration) { + Properties properties = new Properties(); + properties.put("agent", getAgentIdentifier(configuration)); + return properties; + } + + /** + * Returns a service filter that scopes to the agent identifier. Optionally wraps a base filter. + * + * @param configuration + * The configuration + * @param base + * The optional base filter + * @return The filter + */ + protected String getAgentFilter(Map<String, String> configuration, String base) { + if (base == null) { + return "(agent=" + getAgentIdentifier(configuration) + ")"; + } + else { + return "(&(agent=" + getAgentIdentifier(configuration) + ")" + base + ")"; + } + } +} Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java Tue May 7 10:25:13 2013 @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ace.agent.spi; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.felix.dm.Component; +import org.apache.felix.dm.DependencyManager; +import org.osgi.framework.BundleContext; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.log.LogService; + +/** + * Convenience base class for component factories that return just one component. + * + */ +public abstract class OneComponentFactoryBase extends ComponentFactoryBase { + + @Override + public final Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException { + Component component = createComponent(context, manager, logService, configuration); + if (component != null) { + Set<Component> components = new HashSet<Component>(); + components.add(component); + return components; + } + return Collections.emptySet(); + } + + /** + * Returns a component for the specified agent configuration. + * + * @param context + * The Bundle Context + * @param manager + * The Dependency manager + * @param logService + * The Log Service + * @param configuration + * The agent configuration + * @return A component, or <code>null</code> + * @throws ConfigurationException + * If there is a fatal problem + */ + public abstract Component createComponent(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException; +} Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo?rev=1479841&view=auto ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo (added) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo Tue May 7 10:25:13 2013 @@ -0,0 +1 @@ +version 1.0 \ No newline at end of file
