Author: bramk
Date: Mon Aug 19 10:23:36 2013
New Revision: 1515342
URL: http://svn.apache.org/r1515342
Log:
ACE-347 Initial feedback channel implementation
Added:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
Modified:
ace/trunk/org.apache.ace.agent/bnd.bnd
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
Modified: ace/trunk/org.apache.ace.agent/bnd.bnd
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/bnd.bnd?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/bnd.bnd (original)
+++ ace/trunk/org.apache.ace.agent/bnd.bnd Mon Aug 19 10:23:36 2013
@@ -3,10 +3,12 @@ Bundle-Description: Implementation of th
Bundle-Version: 1.0.0
Bundle-Activator: org.apache.ace.agent.impl.Activator
-Private-Package: \
+Private-Package: org.apache.ace.range,\
+ org.apache.ace.log.util,\
+ org.apache.ace.log,\
org.apache.ace.agent.impl,\
org.apache.ace.agent.updater,\
- org.apache.commons.codec,\
+ org.apache.commons.codec,\
org.apache.commons.codec.binary,\
org.apache.commons.codec.digest,\
org.apache.commons.codec.language,\
@@ -45,7 +47,9 @@ Export-Package: org.apache.ace.agent,\
org.easymock,\
org.apache.felix.dependencymanager,\
commons-codec;version=1.4.0,\
- org.apache.felix.http.jetty;version=2.2.1
+ org.apache.felix.http.jetty;version=2.2.1,\
+ org.apache.ace.range.api;version=latest,\
+ org.apache.ace.log.api;version=latest
-sources false
-runfw: org.apache.felix.framework;version='[4.0.3,4.0.3]'
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java
Mon Aug 19 10:23:36 2013
@@ -40,5 +40,5 @@ public interface FeedbackChannel {
* @param type
* @param properties
*/
- void write(int type, Map<String, String> properties);
+ void write(int type, Map<String, String> properties) throws IOException;
}
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
(original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
Mon Aug 19 10:23:36 2013
@@ -18,6 +18,8 @@
*/
package org.apache.ace.agent.impl;
+import java.io.File;
+import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -33,6 +35,8 @@ import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
// TODO Decouple from DM to save 170k in agent size. Or: just include what we
use
public class Activator extends DependencyActivatorBase implements AgentContext
{
@@ -47,15 +51,19 @@ public class Activator extends Dependenc
private volatile AgentControlImpl m_agentControl;
private volatile AgentUpdateHandlerImpl m_agentUpdateHandler; // we use
the implementation type here on purpose
- private volatile DefaultController m_controller;
+ private volatile EventLoggerImpl m_eventLogger;
+ private volatile DefaultController m_defaultController;
- private DependencyManager m_manager;
- private Component m_component;
+ private BundleContext m_bundleContext;
+ private DependencyManager m_dependencyManager;
+ private Component m_agentControlComponent;
+ private Component m_eventLoggerComponent;
@Override
public void init(BundleContext context, DependencyManager manager) throws
Exception {
- m_manager = manager;
+ m_bundleContext = context;
+ m_dependencyManager = manager;
m_executorService = Executors.newScheduledThreadPool(1);
m_configurationHandler = new ConfigurationHandlerImpl(this);
@@ -66,6 +74,7 @@ public class Activator extends Dependenc
Component service = createComponent().setImplementation(this)
.setCallbacks("initAgent", "startAgent", "stopAgent",
"destroyAgent")
+ .setAutoConfig(BundleContext.class, false)
.setAutoConfig(DependencyManager.class, false)
.setAutoConfig(Component.class, false);
@@ -91,9 +100,11 @@ public class Activator extends Dependenc
}
if
(!Boolean.parseBoolean(System.getProperty("agent.defaultcontroller.disabled")))
{
- m_controller = new DefaultController(m_agentControl,
m_executorService);
+ m_defaultController = new DefaultController(m_agentControl,
m_executorService);
}
-
+
+ m_eventLogger = new EventLoggerImpl(m_agentControl, m_bundleContext);
+
manager.add(service);
}
@@ -103,12 +114,25 @@ public class Activator extends Dependenc
void startAgent() throws Exception {
System.out.println("Starting agent!");
- m_component = createComponent()
+
+ m_agentControlComponent = createComponent()
.setInterface(AgentControl.class.getName(), null)
.setImplementation(m_agentControl);
- m_manager.add(m_component);
- if (m_controller != null) {
- m_controller.start();
+ m_dependencyManager.add(m_agentControlComponent);
+
+ m_eventLoggerComponent = createComponent()
+ .setInterface(EventHandler.class.getName(), new Properties() {
+ {
+ put(EventConstants.EVENT_TOPIC,
EventLoggerImpl.TOPICS_INTEREST);
+ }
+ })
+ .setImplementation(m_eventLogger);
+ m_dependencyManager.add(m_eventLoggerComponent);
+ m_bundleContext.addBundleListener(m_eventLogger);
+ m_bundleContext.addFrameworkListener(m_eventLogger);
+
+ if (m_defaultController != null) {
+ m_defaultController.start();
}
// at this point we know the agent has started, so any updater bundle
that
// might still be running can be uninstalled
@@ -117,10 +141,15 @@ public class Activator extends Dependenc
void stopAgent() throws Exception {
System.out.println("Stopping agent");
- if (m_controller != null) {
- m_controller.stop();
+ if (m_defaultController != null) {
+ m_defaultController.stop();
}
- m_manager.remove(m_component);
+
+ m_bundleContext.removeFrameworkListener(m_eventLogger);
+ m_bundleContext.removeBundleListener(m_eventLogger);
+ m_dependencyManager.remove(m_eventLoggerComponent);
+
+ m_dependencyManager.remove(m_agentControlComponent);
}
@Override
@@ -157,9 +186,14 @@ public class Activator extends Dependenc
public DownloadHandler getDownloadHandler() {
return m_downloadHandler;
}
-
+
@Override
public AgentUpdateHandler getAgentUpdateHandler() {
return m_agentUpdateHandler;
}
+
+ @Override
+ public File getWorkDir() {
+ return m_bundleContext.getDataFile("");
+ }
}
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java
Mon Aug 19 10:23:36 2013
@@ -18,6 +18,7 @@
*/
package org.apache.ace.agent.impl;
+import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ace.agent.AgentUpdateHandler;
@@ -31,18 +32,20 @@ import org.apache.ace.agent.Identificati
public interface AgentContext {
IdentificationHandler getIdentificationHandler();
-
+
DiscoveryHandler getDiscoveryHandler();
ConnectionHandler getConnectionHandler();
-
+
DeploymentHandler getDeploymentHandler();
-
+
DownloadHandler getDownloadHandler();
-
+
ScheduledExecutorService getExecutorService();
-
+
ConfigurationHandler getConfigurationHandler();
AgentUpdateHandler getAgentUpdateHandler();
+
+ File getWorkDir();
}
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java
Mon Aug 19 10:23:36 2013
@@ -18,7 +18,11 @@
*/
package org.apache.ace.agent.impl;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.ace.agent.AgentControl;
import org.apache.ace.agent.AgentUpdateHandler;
@@ -31,8 +35,12 @@ public class AgentControlImpl implements
private final AgentContext m_agentContext;
- public AgentControlImpl(AgentContext agentContext) {
+ private final Map<String, FeedbackChannelImpl> m_feedbackChannels = new
HashMap<String, FeedbackChannelImpl>();
+
+ public AgentControlImpl(AgentContext agentContext) throws IOException {
m_agentContext = agentContext;
+ // TODO get from configuration
+ m_feedbackChannels.put("auditlog", new
FeedbackChannelImpl(m_agentContext, "auditlog"));
}
@Override
@@ -52,16 +60,17 @@ public class AgentControlImpl implements
@Override
public List<String> getFeedbackChannelNames() {
- // TODO Auto-generated method stub
- return null;
+ // TODO get from configuration
+ List<String> channels = new ArrayList<String>();
+ channels.addAll(m_feedbackChannels.keySet());
+ return channels;
}
@Override
public FeedbackChannel getFeedbackChannel(String name) {
- // TODO Auto-generated method stub
- return null;
+ return m_feedbackChannels.get(name);
}
-
+
@Override
public AgentUpdateHandler getAgentUpdateHandler() {
return m_agentContext.getAgentUpdateHandler();
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
(original)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
Mon Aug 19 10:23:36 2013
@@ -20,6 +20,7 @@ package org.apache.ace.agent.impl;
import java.io.IOException;
import java.io.InputStream;
+import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -29,6 +30,7 @@ import org.apache.ace.agent.AgentControl
import org.apache.ace.agent.AgentUpdateHandler;
import org.apache.ace.agent.ConfigurationHandler;
import org.apache.ace.agent.DeploymentHandler;
+import org.apache.ace.agent.FeedbackChannel;
import org.apache.ace.agent.RetryAfterException;
import org.osgi.framework.Version;
@@ -60,7 +62,8 @@ public class DefaultController implement
long syncInterval = getSyncInterval();
try {
runSafeAgent();
- //runSafe();
+ // runSafeUpdate();
+ runSafeFeedback();
}
catch (RetryAfterException e) {
syncInterval = e.getSeconds();
@@ -76,7 +79,7 @@ public class DefaultController implement
reSchedule(syncInterval);
}
- public void runSafe() throws RetryAfterException, IOException {
+ private void runSafeUpdate() throws RetryAfterException, IOException {
DeploymentHandler deploymentHandler = getDeploymentHandler();
@@ -97,7 +100,17 @@ public class DefaultController implement
}
}
}
- public void runSafeAgent() throws RetryAfterException, IOException {
+
+ private void runSafeFeedback() throws RetryAfterException, IOException {
+ List<String> channelNames = m_agentControl.getFeedbackChannelNames();
+ for (String channelName : channelNames) {
+ FeedbackChannel channel =
m_agentControl.getFeedbackChannel(channelName);
+ if (channel != null)
+ channel.sendFeedback();
+ }
+ }
+
+ private void runSafeAgent() throws RetryAfterException, IOException {
AgentUpdateHandler deploymentHandler = getAgentUpdateHandler();
Added:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java?rev=1515342&view=auto
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
(added)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
Mon Aug 19 10:23:36 2013
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ace.agent.AgentControl;
+import org.apache.ace.agent.FeedbackChannel;
+import org.apache.ace.log.AuditEvent;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.BundleListener;
+import org.osgi.framework.Constants;
+import org.osgi.framework.FrameworkEvent;
+import org.osgi.framework.FrameworkListener;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.Version;
+import org.osgi.service.deploymentadmin.DeploymentAdmin;
+import org.osgi.service.deploymentadmin.DeploymentPackage;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+/**
+ * Service component that listens for
+ *
+ */
+// TODO quick copy & paste & simplify from org.apache.ace.log.listener.*
+// TODO Which event types to log must be configurable
+// TODO split into separate listeners
+public class EventLoggerImpl implements BundleListener, FrameworkListener,
EventHandler {
+
+ /*
+ * FIXME This is a simplified quick copy and paste of
org.apache.ace.log.listener.* without caching and async. I
+ * think that is OK. However we need to revisit all logging/monitoring and
this logic should probably be made
+ * configurable split up is separate components.
+ *
+ * @see EvenLoggerFactory as well
+ */
+
+ public static final String EVENTLOGGER_FEEDBACKCHANNEL = "auditlog";
+
+ public static final String[] TOPICS_INTEREST = new String[] {
"org/osgi/service/deployment/*", "org/apache/ace/deployment/*" };
+
+ public static final String TOPIC_INSTALL =
"org/osgi/service/deployment/INSTALL";
+ public static final String TOPIC_UNINSTALL =
"org/osgi/service/deployment/UNINSTALL";
+ public static final String TOPIC_COMPLETE =
"org/osgi/service/deployment/COMPLETE";
+ public static final String TOPIC_DEPLOYMENTPACKAGE_INSTALL =
"org/apache/ace/deployment/INSTALL";
+
+ private final BundleContext m_bundleContext;
+ private final AgentControl m_agentControl;
+
+ public EventLoggerImpl(AgentControl agentControl, BundleContext
bundleContext) {
+ m_agentControl = agentControl;
+ m_bundleContext = bundleContext;
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ int eventType = AuditEvent.DEPLOYMENTADMIN_BASE;
+ Map<String, String> props = new HashMap<String, String>();
+
+ String topic = event.getTopic();
+
+ if (topic.equals(TOPIC_DEPLOYMENTPACKAGE_INSTALL)) {
+ String url = (String) event.getProperty("deploymentpackage.url");
+ String version = (String)
event.getProperty("deploymentpackage.version");
+ eventType = AuditEvent.DEPLOYMENTCONTROL_INSTALL;
+ props.put(AuditEvent.KEY_VERSION, version);
+ props.put(AuditEvent.KEY_NAME, url);
+ }
+ else if (topic.equals(TOPIC_INSTALL)) {
+ String deplPackName = (String)
event.getProperty("deploymentpackage.name");
+ eventType = AuditEvent.DEPLOYMENTADMIN_INSTALL;
+ props.put(AuditEvent.KEY_NAME, deplPackName);
+ }
+
+ else if (topic.equals(TOPIC_UNINSTALL)) {
+ String deplPackName = (String)
event.getProperty("deploymentpackage.name");
+ eventType = AuditEvent.DEPLOYMENTADMIN_UNINSTALL;
+ props.put(AuditEvent.KEY_NAME, deplPackName);
+ }
+ else if (topic.equals(TOPIC_COMPLETE)) {
+ String deplPackName = (String)
event.getProperty("deploymentpackage.name");
+ // to retrieve the version, DeploymentAdmin has to be used
+ ServiceReference ref =
m_bundleContext.getServiceReference(DeploymentAdmin.class.getName());
+ if (ref != null) {
+ DeploymentAdmin deplAdmin = (DeploymentAdmin)
m_bundleContext.getService(ref);
+ if (deplAdmin != null) {
+ DeploymentPackage dp =
deplAdmin.getDeploymentPackage(deplPackName);
+ if (dp != null) {
+ Version version = dp.getVersion();
+ if (version != null) {
+ props.put(AuditEvent.KEY_VERSION,
version.toString());
+ }
+ }
+ // after use, release the service as is it not needed
anymore
+ m_bundleContext.ungetService(ref);
+ }
+ }
+ eventType = AuditEvent.DEPLOYMENTADMIN_COMPLETE;
+ props.put(AuditEvent.KEY_NAME, deplPackName);
+ Boolean success = (Boolean) event.getProperty("successful");
+ props.put(AuditEvent.KEY_SUCCESS, success.toString());
+ }
+ writeEvent(eventType, props);
+ }
+
+ @Override
+ public void frameworkEvent(FrameworkEvent event) {
+ int eventType = AuditEvent.FRAMEWORK_BASE;
+ Map<String, String> props = new HashMap<String, String>();
+ Bundle bundle = event.getBundle();
+
+ if (bundle != null) {
+ props.put(AuditEvent.KEY_ID, Long.toString(bundle.getBundleId()));
+ }
+
+ String msg = null;
+ String type = null;
+ Throwable exception = event.getThrowable();
+ if (exception != null) {
+ msg = exception.getMessage();
+ type = exception.getClass().getName();
+ }
+
+ switch (event.getType()) {
+ case FrameworkEvent.INFO:
+ eventType = AuditEvent.FRAMEWORK_INFO;
+ if (msg != null) {
+ props.put(AuditEvent.KEY_MSG, msg);
+ }
+ if (type != null) {
+ props.put(AuditEvent.KEY_TYPE, type);
+ }
+ break;
+ case FrameworkEvent.WARNING:
+ eventType = AuditEvent.FRAMEWORK_WARNING;
+ if (msg != null) {
+ props.put(AuditEvent.KEY_MSG, msg);
+ }
+ if (type != null) {
+ props.put(AuditEvent.KEY_TYPE, type);
+ }
+ break;
+ case FrameworkEvent.ERROR:
+ eventType = AuditEvent.FRAMEWORK_ERROR;
+ if (msg != null) {
+ props.put(AuditEvent.KEY_MSG, msg);
+ }
+ if (type != null) {
+ props.put(AuditEvent.KEY_TYPE, type);
+ }
+ break;
+ case FrameworkEvent.PACKAGES_REFRESHED:
+ eventType = AuditEvent.FRAMEWORK_REFRESH;
+ break;
+ case FrameworkEvent.STARTED:
+ eventType = AuditEvent.FRAMEWORK_STARTED;
+ break;
+ case FrameworkEvent.STARTLEVEL_CHANGED:
+ eventType = AuditEvent.FRAMEWORK_STARTLEVEL;
+ break;
+ }
+ writeEvent(eventType, props);
+ }
+
+ @Override
+ public void bundleChanged(BundleEvent event) {
+ int eventType = AuditEvent.BUNDLE_BASE;
+ Map<String, String> props = new HashMap<String, String>();
+ Bundle bundle = event.getBundle();
+ props.put(AuditEvent.KEY_ID, Long.toString(bundle.getBundleId()));
+
+ switch (event.getType()) {
+ case BundleEvent.INSTALLED:
+ eventType = AuditEvent.BUNDLE_INSTALLED;
+ if (bundle.getSymbolicName() != null) {
+ props.put(AuditEvent.KEY_NAME, bundle.getSymbolicName());
+ }
+ String version = (String)
bundle.getHeaders().get(Constants.BUNDLE_VERSION);
+ if (version != null) {
+ props.put(AuditEvent.KEY_VERSION, version);
+ }
+ props.put(AuditEvent.KEY_LOCATION, bundle.getLocation());
+ break;
+ case BundleEvent.RESOLVED:
+ eventType = AuditEvent.BUNDLE_RESOLVED;
+ break;
+ case BundleEvent.STARTED:
+ eventType = AuditEvent.BUNDLE_STARTED;
+ break;
+ case BundleEvent.STOPPED:
+ eventType = AuditEvent.BUNDLE_STOPPED;
+ break;
+ case BundleEvent.UNRESOLVED:
+ eventType = AuditEvent.BUNDLE_UNRESOLVED;
+ break;
+ case BundleEvent.UPDATED:
+ eventType = AuditEvent.BUNDLE_UPDATED;
+ version = (String)
bundle.getHeaders().get(Constants.BUNDLE_VERSION);
+ if (version != null) {
+ props.put(AuditEvent.KEY_VERSION, version);
+ }
+ props.put(AuditEvent.KEY_LOCATION, bundle.getLocation());
+ break;
+ case BundleEvent.UNINSTALLED:
+ eventType = AuditEvent.BUNDLE_UNINSTALLED;
+ break;
+ case BundleEvent.STARTING:
+ eventType = AuditEvent.BUNDLE_STARTING;
+ break;
+ case BundleEvent.STOPPING:
+ eventType = AuditEvent.BUNDLE_STOPPING;
+ break;
+ }
+ writeEvent(eventType, props);
+ }
+
+ private void writeEvent(int eventType, Map<String, String> payload) {
+ FeedbackChannel channel =
m_agentControl.getFeedbackChannel(EVENTLOGGER_FEEDBACKCHANNEL);
+ if (channel != null) {
+ try {
+ channel.write(eventType, payload);
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+}
Added:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
URL:
http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java?rev=1515342&view=auto
==============================================================================
---
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
(added)
+++
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
Mon Aug 19 10:23:36 2013
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.io.Writer;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.ace.agent.ConnectionHandler;
+import org.apache.ace.agent.FeedbackChannel;
+import org.apache.ace.agent.RetryAfterException;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.range.RangeIterator;
+import org.apache.ace.range.SortedRangeSet;
+
+/**
+ * FeedbackChannel implementation
+ *
+ */
+// TODO: rotate/truncate<br/>
+// TODO: test(coverage)<br/>
+// TODO: decouple from range/log API?
+public class FeedbackChannelImpl implements FeedbackChannel {
+
+ private static final String DIRECTORY_NAME = "feedback";
+ private static final String COMMAND_QUERY = "query";
+ private static final String COMMAND_SEND = "send";
+ private static final String PARAMETER_TARGETID = "tid";
+ private static final String PARAMETER_LOGID = "logid";
+
+ private final AgentContext m_agentContext;
+ private final String m_name;
+ private final File m_baseDir;
+ private final FileFilter m_fileFilter = new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.getName().startsWith(m_name);
+ }
+ };
+
+ private Store m_store = null;
+ private long m_highest;
+
+ public FeedbackChannelImpl(AgentContext agentContext, String name) throws
IOException {
+ m_agentContext = agentContext;
+ m_name = name;
+ m_baseDir = new File(m_agentContext.getWorkDir(), DIRECTORY_NAME);
+ if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs())
+ throw new IllegalArgumentException("Need valid dir");
+ initStore();
+ }
+
+ @Override
+ public synchronized void sendFeedback() throws RetryAfterException,
IOException {
+ String identification = getIdentification();
+ URL serverURL = getServerURL();
+ if (identification == null || serverURL == null)
+ return;
+ URLConnection sendConnection = null;
+ Writer writer = null;
+ try {
+ URL sendURL = new URL(serverURL, m_name + "/" + COMMAND_SEND);
+ sendConnection = getConnectionHandler().getConnection(sendURL);
+ sendConnection.setDoOutput(true);
+ if (sendConnection instanceof HttpURLConnection)
+ ((HttpURLConnection)
sendConnection).setChunkedStreamingMode(8192);
+ writer = new BufferedWriter(new
OutputStreamWriter(sendConnection.getOutputStream()));
+ SortedSet<Long> storeIDs = getStoreIDs();
+ for (Long storeID : storeIDs) {
+ URL queryURL = new URL(serverURL, m_name + "/" + COMMAND_QUERY
+ "?" + PARAMETER_TARGETID + "=" + identification + "&" + PARAMETER_LOGID + "="
+ storeID);
+ URLConnection queryConnection =
getConnectionHandler().getConnection(queryURL);
+ synchronizeStore(storeID, queryConnection.getInputStream(),
writer);
+ }
+ writer.flush();
+ sendConnection.getContent();
+ }
+ catch (ConnectException e) {
+ e.printStackTrace();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ finally {
+ if (writer != null)
+ writer.close();
+ if (sendConnection instanceof HttpURLConnection)
+ ((HttpURLConnection) sendConnection).disconnect();
+ }
+ }
+
+ @Override
+ public synchronized void write(int type, Map<String, String> properties)
throws IOException {
+ try {
+ LogEvent result = new LogEvent(null, m_store.getId(),
getNextEventID(), System.currentTimeMillis(), type,
mapToDictionary(properties));
+ m_store.append(result.getID(),
result.toRepresentation().getBytes());
+ }
+ catch (IOException ex) {
+ handleException(m_store, ex);
+ }
+ }
+
+ // TODO Is this called?
+ public synchronized void closeStore() throws IOException {
+ m_store.close();
+ m_store = null;
+ }
+
+ private void initStore() throws IOException {
+ SortedSet<Long> storeIDs = getStoreIDs();
+ if (storeIDs.isEmpty()) {
+ m_store = newFeedbackStore();
+ }
+ else {
+ m_store = createStore(storeIDs.last());
+ try {
+ m_store.init();
+ }
+ catch (IOException ex) {
+ handleException(m_store, ex);
+ }
+ }
+ }
+
+ private void synchronizeStore(long storeID, InputStream queryInput, Writer
sendWriter) throws IOException {
+ long highestLocal = getHighestEventID(storeID);
+ if (highestLocal == 0)
+ return;
+ SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
+ SortedRangeSet remoteRange =
getQueryDescriptor(queryInput).getRangeSet();
+ SortedRangeSet delta = remoteRange.diffDest(localRange);
+ RangeIterator rangeIterator = delta.iterator();
+ if (!rangeIterator.hasNext())
+ return;
+ String identification = getIdentification();
+ long lowest = rangeIterator.next();
+ long highest = delta.getHigh();
+ if (lowest <= highest) {
+ List<LogEvent> events = getEvents(storeID, lowest, highestLocal >
highest ? highest : highestLocal);
+ Iterator<LogEvent> iter = events.iterator();
+ while (iter.hasNext()) {
+ LogEvent current = (LogEvent) iter.next();
+ while ((current.getID() > lowest) && rangeIterator.hasNext()) {
+ lowest = rangeIterator.next();
+ }
+ if (current.getID() == lowest) {
+ LogEvent event = new LogEvent(identification, current);
+ sendWriter.write(event.toRepresentation());
+ sendWriter.write("\n");
+ }
+ }
+ }
+ }
+
+ private LogDescriptor getQueryDescriptor(InputStream queryInput) throws
IOException {
+ BufferedReader queryReader = null;
+ try {
+ queryReader = new BufferedReader(new
InputStreamReader(queryInput));
+ String rangeString = queryReader.readLine();
+ if (rangeString == null) {
+ throw new IOException("Could not construct LogDescriptor from
stream because stream is empty");
+ }
+ try {
+ return new LogDescriptor(rangeString);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IOException("Could not determine highest remote
event id, received malformed event range (" + rangeString + ")");
+ }
+ }
+ finally {
+ if (queryReader != null) {
+ try {
+ queryReader.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ }
+
+ private Store newFeedbackStore() throws IOException {
+ long storeId = System.currentTimeMillis();
+ while (!(new File(m_baseDir, getStoreName(storeId))).createNewFile()) {
+ storeId++;
+ }
+ return new Store(new File(m_baseDir, getStoreName(storeId)), storeId);
+ }
+
+ private Store createStore(long storeId) throws IOException {
+ return new Store(new File(m_baseDir, getStoreName(storeId)), storeId);
+ }
+
+ private String getStoreName(long storeId) {
+ return m_name + "-" + storeId;
+ }
+
+ private long getStoreId(String storeName) {
+ return Long.parseLong(storeName.replace(m_name + "-", ""));
+ }
+
+ private List<LogEvent> getEvents(long storeID, long fromEventID, long
toEventID) throws IOException {
+ Store store = getStore(storeID);
+ List<LogEvent> result = new ArrayList<LogEvent>();
+ try {
+ if (store.getCurrent() > fromEventID) {
+ store.reset();
+ }
+ while (store.hasNext()) {
+ long eventID = store.readCurrentID();
+ if ((eventID >= fromEventID) && (eventID <= toEventID)) {
+ result.add(new LogEvent(new String(store.read())));
+ }
+ else {
+ store.skip();
+ }
+ }
+ }
+ catch (Exception ex) {
+ handleException(store, ex);
+ }
+ finally {
+ closeIfNeeded(store);
+ }
+ return result;
+ }
+
+ private void handleException(Store store, Exception exception) throws
IOException {
+ // System.err.println(LogService.LOG_WARNING, "Exception accessing the
log: "
+ // + store.getId(), exception);
+ if (store == m_store)
+ m_store = newFeedbackStore();
+
+ try {
+ store.truncate();
+ }
+ catch (IOException ex) {
+ // m_log.log(LogService.LOG_WARNING, "Exception during truncate: "
+ // + store.getId(), ex);
+ }
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do
+ }
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ throw new IOException("Unable to read log entry: "
+ + exception.getMessage());
+ }
+
+ private File[] getStoreFiles() throws IOException {
+ File[] files = (File[]) m_baseDir.listFiles(m_fileFilter);
+ if (files == null)
+ throw new IOException("Unable to list store files in " +
m_baseDir.getAbsolutePath());
+ return files;
+ }
+
+ private SortedSet<Long> getStoreIDs() throws IOException {
+ File[] files = getStoreFiles();
+ SortedSet<Long> storeIDs = new TreeSet<Long>();
+ for (int i = 0; i < files.length; i++)
+ storeIDs.add(getStoreId(files[i].getName()));
+ return storeIDs;
+ }
+
+ private long getHighestEventID(long storeID) throws IOException {
+ Store store = getStore(storeID);
+ try {
+ if (m_highest == 0) {
+ store.init();
+ return (m_highest = store.getCurrent());
+ }
+ else {
+ return m_highest;
+ }
+ }
+ catch (IOException ex) {
+ handleException(store, ex);
+ }
+ finally {
+ closeIfNeeded(store);
+ }
+ return -1;
+ }
+
+ private void closeIfNeeded(Store store) {
+ if (store != m_store) {
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do;
+ }
+ }
+ }
+
+ private Store getStore(long storeID) throws IOException {
+ if (m_store.getId() == storeID) {
+ return m_store;
+ }
+ return createStore(storeID);
+ }
+
+ private long getNextEventID() throws IOException {
+ return (m_highest = getHighestEventID(m_store.m_id) + 1);
+ }
+
+ private ConnectionHandler getConnectionHandler() {
+ return m_agentContext.getConnectionHandler();
+ }
+
+ private String getIdentification() {
+ return m_agentContext.getIdentificationHandler().getIdentification();
+ }
+
+ private URL getServerURL() {
+ return m_agentContext.getDiscoveryHandler().getServerUrl();
+ }
+
+ // bridging to log api
+ private static Dictionary<String, String> mapToDictionary(Map<String,
String> map) {
+ Dictionary<String, String> dictionary = new Hashtable<String,
String>();
+ for (Entry<String, String> entry : map.entrySet()) {
+ dictionary.put(entry.getKey(), entry.getValue());
+ }
+ return dictionary;
+ }
+
+ /**
+ * The general idea is to provide easy access to a file of records. It
supports iterating over records both by
+ * skipping and by reading. Furthermore, files can be truncated. Most
methods will make an effort to reset to the
+ * last good record in case of an error -- hence, a call to truncate after
an IOException might make the store
+ * readable again.
+ */
+ static class Store {
+ private final RandomAccessFile m_store;
+ private final long m_id;
+ private long m_current;
+
+ /**
+ * Create a new File based Store.
+ *
+ * @param store
+ * the file to use as backend.
+ * @param id
+ * the log id of the store
+ * @throws java.io.IOException
+ * in case the file is not rw.
+ */
+ Store(File store, long id) throws IOException {
+ m_store = new RandomAccessFile(store, "rwd");
+ m_id = id;
+ }
+
+ /**
+ * Get the id of the current record.
+ *
+ * @return the idea of the current record.
+ */
+ public long getCurrent() throws IOException {
+ long pos = m_store.getFilePointer();
+ if (m_store.length() == 0) {
+ return 0;
+ }
+ long result = 0;
+ try {
+ m_store.seek(m_current);
+ result = readCurrentID();
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return result;
+ }
+
+ /**
+ * Get the log id of this store.
+ *
+ * @return the log id of this store.
+ */
+ public long getId() {
+ return m_id;
+ }
+
+ /**
+ * Reset the store to the beginning of the records
+ *
+ * @throws java.io.IOException
+ * in case of an IO error.
+ */
+ public void reset() throws IOException {
+ m_store.seek(0);
+ m_current = 0;
+ }
+
+ /**
+ * Determine whether there are any records left based on the current
postion.
+ *
+ * @return <code>true</code> if there are still records to be read.
+ * @throws java.io.IOException
+ * in case of an IO error.
+ */
+ public boolean hasNext() throws IOException {
+ return m_store.getFilePointer() < m_store.length();
+ }
+
+ public byte[] read() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ if (pos < m_store.length()) {
+ long current = m_store.getFilePointer();
+ long id = m_store.readLong();
+ int next = m_store.readInt();
+ byte[] entry = new byte[next];
+ m_store.readFully(entry);
+ m_current = current;
+ return entry;
+ }
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return null;
+ }
+
+ public long readCurrentID() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ if (pos < m_store.length()) {
+ long id = m_store.readLong();
+ m_store.seek(pos);
+ return id;
+ }
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return -1;
+ }
+
+ /**
+ * Make sure the store is readable. As a result, the store is at the
end of the records.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void init() throws IOException {
+ reset();
+ try {
+ while (true) {
+ skip();
+ }
+ }
+ catch (EOFException ex) {
+ // done
+ }
+ }
+
+ /**
+ * Skip the next record if there is any.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error or if there is no record left.
+ */
+ public void skip() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ long id = m_store.readLong();
+ int next = m_store.readInt();
+ if (m_store.length() < next + m_store.getFilePointer()) {
+ throw new IOException("Unexpected end of file");
+ }
+ m_store.seek(m_store.getFilePointer() + next);
+ m_current = pos;
+ pos = m_store.getFilePointer();
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ }
+
+ /**
+ * Store the given record data as the next record.
+ *
+ * @param entry
+ * the data of the record to store.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void append(long id, byte[] entry) throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ m_store.seek(m_store.length());
+ long current = m_store.getFilePointer();
+ m_store.writeLong(id);
+ m_store.writeInt(entry.length);
+ m_store.write(entry);
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ }
+
+ /**
+ * Try to truncate the store at the current record.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void truncate() throws IOException {
+ m_store.setLength(m_store.getFilePointer());
+ }
+
+ /**
+ * Release any resources.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void close() throws IOException {
+ m_store.close();
+ }
+
+ private void handle(long pos, IOException exception) throws
IOException {
+ try {
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ // m_log.log(LogService.LOG_WARNING, "Exception during seek!",
ex);
+ }
+ throw exception;
+ }
+ }
+}