FLUME-1408. Log uncaught Throwables thrown within Executors. (Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8d9770e9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8d9770e9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8d9770e9 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: 8d9770e96735f33b051c443dcd94545bdf583cb0 Parents: be92478 Author: Hari <[email protected]> Authored: Mon Jul 30 20:58:57 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 7 13:08:31 2012 -0700 ---------------------------------------------------------------------- .../flume/instrumentation/GangliaServer.java | 73 ++++---- .../flume/lifecycle/LifecycleSupervisor.java | 148 +++++++------- .../file/AbstractFileConfigurationProvider.java | 3 + .../org/apache/flume/sink/hdfs/BucketWriter.java | 10 +- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 1 - 5 files changed, 123 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8d9770e9/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java index 1104141..d93cd33 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java @@ -37,7 +37,6 @@ import javax.management.AttributeList; import javax.management.MBeanAttributeInfo; import javax.management.MBeanServer; import javax.management.ObjectInstance; -import javax.management.ObjectName; import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.api.HostInfo; @@ -332,45 +331,49 @@ public class GangliaServer implements MonitorService { @Override public void run() { - Set<ObjectInstance> queryMBeans = null; try { - queryMBeans = mbeanServer.queryMBeans( - null, null); - } catch (Exception ex) { - logger.error("Could not get Mbeans for monitoring", ex); - Throwables.propagate(ex); - } - for (ObjectInstance obj : queryMBeans) { + Set<ObjectInstance> queryMBeans = null; try { - if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { - continue; - } - MBeanAttributeInfo[] attrs = mbeanServer. - getMBeanInfo(obj.getObjectName()).getAttributes(); - String strAtts[] = new String[attrs.length]; - for (int i = 0; i < strAtts.length; i++) { - strAtts[i] = attrs[i].getName(); - } - AttributeList attrList = mbeanServer.getAttributes( - obj.getObjectName(), strAtts); - String component = obj.getObjectName().toString().substring( - obj.getObjectName().toString().indexOf('=') + 1); - for (Object attr : attrList) { - Attribute localAttr = (Attribute) attr; - if (isGanglia3) { - server.createGangliaMessage(GANGLIA_CONTEXT + component + "." - + localAttr.getName(), - localAttr.getValue().toString()); - } else { - server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." - + localAttr.getName(), - localAttr.getValue().toString()); + queryMBeans = mbeanServer.queryMBeans( + null, null); + } catch (Exception ex) { + logger.error("Could not get Mbeans for monitoring", ex); + Throwables.propagate(ex); + } + for (ObjectInstance obj : queryMBeans) { + try { + if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { + continue; + } + MBeanAttributeInfo[] attrs = mbeanServer. + getMBeanInfo(obj.getObjectName()).getAttributes(); + String strAtts[] = new String[attrs.length]; + for (int i = 0; i < strAtts.length; i++) { + strAtts[i] = attrs[i].getName(); } - server.sendToGangliaNodes(); + AttributeList attrList = mbeanServer.getAttributes( + obj.getObjectName(), strAtts); + String component = obj.getObjectName().toString().substring( + obj.getObjectName().toString().indexOf('=') + 1); + for (Object attr : attrList) { + Attribute localAttr = (Attribute) attr; + if (isGanglia3) { + server.createGangliaMessage(GANGLIA_CONTEXT + component + "." + + localAttr.getName(), + localAttr.getValue().toString()); + } else { + server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." + + localAttr.getName(), + localAttr.getValue().toString()); + } + server.sendToGangliaNodes(); + } + } catch (Exception ex) { + logger.error("Error getting mbean attributes", ex); } - } catch (Exception ex) { - logger.error("Error getting mbean attributes", ex); } + } catch(Throwable t) { + logger.error("Unexpected error", t); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/8d9770e9/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java index 2ac94df..78eda05 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java @@ -201,94 +201,94 @@ public class LifecycleSupervisor implements LifecycleAware { long now = System.currentTimeMillis(); - if (supervisoree.status.firstSeen == null) { - logger.debug("first time seeing {}", lifecycleAware); + try { + if (supervisoree.status.firstSeen == null) { + logger.debug("first time seeing {}", lifecycleAware); - supervisoree.status.firstSeen = now; - } - - supervisoree.status.lastSeen = now; - synchronized (lifecycleAware) { - if (supervisoree.status.discard) { - // Unsupervise has already been called on this. - logger.info("Component has already been stopped {}", lifecycleAware); - return; - } else if(supervisoree.status.error) { - logger.info("Component {} is in error state, and Flume will not" + - "attempt to change its state", lifecycleAware); - return; + supervisoree.status.firstSeen = now; } - supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); + supervisoree.status.lastSeen = now; + synchronized (lifecycleAware) { + if (supervisoree.status.discard) { + // Unsupervise has already been called on this. + logger.info("Component has already been stopped {}", lifecycleAware); + return; + } else if (supervisoree.status.error) { + logger.info("Component {} is in error state, and Flume will not" + + "attempt to change its state", lifecycleAware); + return; + } + + supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); - if (!lifecycleAware.getLifecycleState().equals( - supervisoree.status.desiredState)) { + if (!lifecycleAware.getLifecycleState().equals( + supervisoree.status.desiredState)) { - logger - .debug("Want to transition {} from {} to {} (failures:{})", - new Object[] { lifecycleAware, - supervisoree.status.lastSeenState, + logger.debug("Want to transition {} from {} to {} (failures:{})", + new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); - switch (supervisoree.status.desiredState) { - case START: - try { - lifecycleAware.start(); - } catch (Throwable e) { - logger.error("Unable to start " + lifecycleAware - + " - Exception follows.", e); - if(e instanceof Error){ - //This component can never recover, shut it down. - supervisoree.status.desiredState = LifecycleState.STOP; - try{ - lifecycleAware.stop(); - logger.warn("Component {} stopped, since it could not be" + - "successfully started due to missing dependencies", - lifecycleAware); - } catch (Throwable e1) { - logger.error("Unsuccessful attempt to " + - "shutdown component: {} due to missing dependencies." + - " Please shutdown the agent" + - "or disable this component, or the agent will be" + - "in an undefined state.", e1); - supervisoree.status.error = true; - if(e1 instanceof Error){ - throw (Error)e1; + switch (supervisoree.status.desiredState) { + case START: + try { + lifecycleAware.start(); + } catch (Throwable e) { + logger.error("Unable to start " + lifecycleAware + + " - Exception follows.", e); + if (e instanceof Error) { + // This component can never recover, shut it down. + supervisoree.status.desiredState = LifecycleState.STOP; + try { + lifecycleAware.stop(); + logger.warn("Component {} stopped, since it could not be" + + "successfully started due to missing dependencies", + lifecycleAware); + } catch (Throwable e1) { + logger.error("Unsuccessful attempt to " + + "shutdown component: {} due to missing dependencies." + + " Please shutdown the agent" + + "or disable this component, or the agent will be" + + "in an undefined state.", e1); + supervisoree.status.error = true; + if (e1 instanceof Error) { + throw (Error) e1; + } + // Set the state to stop, so that the conf poller can + // proceed. + } + } + supervisoree.status.failures++; + } + break; + case STOP: + try { + lifecycleAware.stop(); + } catch (Throwable e) { + logger.error("Unable to stop " + lifecycleAware + + " - Exception follows.", e); + if (e instanceof Error) { + throw (Error) e; + } + supervisoree.status.failures++; } - //Set the state to stop, so that the conf poller can - //proceed. - } + break; + default: + logger.warn("I refuse to acknowledge {} as a desired state", + supervisoree.status.desiredState); } - supervisoree.status.failures++; - } - break; - case STOP: - try { - lifecycleAware.stop(); - } catch (Throwable e) { - logger.error("Unable to stop " + lifecycleAware - + " - Exception follows.", e); - if(e instanceof Error) { - throw (Error)e; + + if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { + logger.error( + "Policy {} of {} has been violated - supervisor should exit!", + supervisoree.policy, lifecycleAware); } - supervisoree.status.failures++; } - break; - default: - logger.warn("I refuse to acknowledge {} as a desired state", - supervisoree.status.desiredState); - } - - if (!supervisoree.policy.isValid( - lifecycleAware, supervisoree.status)) { - logger.error( - "Policy {} of {} has been violated - supervisor should exit!", - supervisoree.policy, lifecycleAware); } + } catch(Throwable t) { + logger.error("Unexpected error", t); } - } - logger.debug("Status check complete"); } } http://git-wip-us.apache.org/repos/asf/flume/blob/8d9770e9/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java index 9f900d3..a2c882b 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java @@ -206,6 +206,9 @@ public abstract class AbstractFileConfigurationProvider implements } catch (NoClassDefFoundError e) { logger.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); + } catch (Throwable t) { + // caught because the caller does not handle or log Throwables + logger.error("Unhandled error", t); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/8d9770e9/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 75ff069..6408eb9 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -40,6 +40,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Throwables; + /** * Internal API intended for HDFSSink use. * This class does file rolling and handles file formats and serialization. @@ -199,7 +201,7 @@ class BucketWriter { if (ex instanceof IOException) { throw (IOException) ex; } else { - throw new IOException(ex); + throw Throwables.propagate(ex); } } } @@ -213,7 +215,11 @@ class BucketWriter { public Void call() throws Exception { LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", bucketPath + IN_USE_EXT, rollInterval); - close(); + try { + close(); + } catch(Throwable t) { + LOG.error("Unexpected error", t); + } return null; } }; http://git-wip-us.apache.org/repos/asf/flume/blob/8d9770e9/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index d65c5a8..fcb9642 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -460,7 +460,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) { LOG.info("Closing {}", entry.getKey()); - final BucketWriter callableWriter = entry.getValue(); try { close(entry.getValue()); } catch (Exception ex) {
