Updated Branches: refs/heads/trunk d20c94ca6 -> 5b636c123
FLUME-1777. AbstractSource does not provide enough implementation for sub-classes (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5b636c12 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5b636c12 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5b636c12 Branch: refs/heads/trunk Commit: 5b636c1237f51dc0a8a9c2b589747e0477dbb5fc Parents: d20c94c Author: Hari Shreedharan <[email protected]> Authored: Thu Dec 13 12:05:24 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 13 12:05:24 2012 -0800 ---------------------------------------------------------------------- .../flume/source/AbstractEventDrivenSource.java | 38 ++++ .../flume/source/AbstractPollableSource.java | 58 ++++++ .../apache/flume/source/BasicSourceSemantics.java | 148 +++++++++++++++ .../org/apache/flume/source/http/HTTPSource.java | 2 +- .../flume/source/TestAbstractPollableSource.java | 64 +++++++ .../flume/source/TestBasicSourceSemantics.java | 121 ++++++++++++ 6 files changed, 430 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5b636c12/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java new file mode 100644 index 0000000..89bd357 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import org.apache.flume.EventDrivenSource; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; + +/** + * Base class which ensures sub-classes will inherit all the properties + * of BasicSourceSemantics. Adds no additional functionality and is provided + * for completeness sake. + */ [email protected] [email protected] +public abstract class AbstractEventDrivenSource extends BasicSourceSemantics + implements EventDrivenSource { + + public AbstractEventDrivenSource() { + super(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5b636c12/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java new file mode 100644 index 0000000..356f4d4 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.PollableSource; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; + +/** + * Base class which ensures sub-classes will inherit all the properties + * of BasicSourceSemantics in addition to: + * <ol> + * <li>Ensuring when configure/start throw an exception process will not + * be called</li> + * <li>Ensure that process will not be called unless configure and start + * have successfully been called</li> + * </ol> + */ [email protected] [email protected] +public abstract class AbstractPollableSource extends BasicSourceSemantics + implements PollableSource { + + public AbstractPollableSource() { + super(); + } + public Status process() throws EventDeliveryException { + Exception exception = getStartException(); + if (exception != null) { + throw new FlumeException("Source had error configuring or starting", + exception); + } + if(!isStarted()) { + throw new EventDeliveryException("Source is not started"); + } + return doProcess(); + } + + protected abstract Status doProcess() throws EventDeliveryException; +} http://git-wip-us.apache.org/repos/asf/flume/blob/5b636c12/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java new file mode 100644 index 0000000..d2672b5 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import org.apache.flume.Context; +import org.apache.flume.FlumeException; +import org.apache.flume.Source; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurable; +import org.apache.flume.lifecycle.LifecycleState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +/** + * Alternative to AbstractSource, which: + * <ol> + * <li>Ensure configure cannot be called while started</li> + * <li>Exceptions thrown during configure, start, stop put source in ERROR state</li> + * <li>Exceptions thrown during start, stop will be logged but not re-thrown.</li> + * <li>Exception in configure disables starting</li> + * </ol> + */ [email protected] [email protected] +public abstract class BasicSourceSemantics implements Source, Configurable { + private static final Logger logger = LoggerFactory + .getLogger(BasicSourceSemantics.class); + private Exception exception; + private ChannelProcessor channelProcessor; + private String name; + private LifecycleState lifecycleState; + + public BasicSourceSemantics() { + lifecycleState = LifecycleState.IDLE; + } + @Override + public synchronized void configure(Context context) { + if(isStarted()) { + throw new IllegalStateException("Configure called when started"); + } else { + try { + exception = null; + setLifecycleState(LifecycleState.IDLE); + doConfigure(context); + } catch (Exception e) { + exception = e; + setLifecycleState(LifecycleState.ERROR); + // causes source to be removed by configuration code + Throwables.propagate(e); + } + } + } + @Override + public synchronized void start() { + if (exception != null) { + logger.error(String.format("Cannot start due to error: name = %s", + getName()), exception); + } else { + try { + Preconditions.checkState(channelProcessor != null, + "No channel processor configured"); + doStart(); + setLifecycleState(LifecycleState.START); + } catch (Exception e) { + logger.error(String.format( + "Unexpected error performing start: name = %s", getName()), e); + exception = e; + setLifecycleState(LifecycleState.ERROR); + } + } + } + @Override + public synchronized void stop() { + try { + doStop(); + setLifecycleState(LifecycleState.STOP); + } catch (Exception e) { + logger.error(String.format( + "Unexpected error performing stop: name = %s", getName()), e); + setLifecycleState(LifecycleState.ERROR); + } + } + @Override + public synchronized void setChannelProcessor(ChannelProcessor cp) { + channelProcessor = cp; + } + + @Override + public synchronized ChannelProcessor getChannelProcessor() { + return channelProcessor; + } + @Override + public synchronized void setName(String name) { + this.name = name; + } + + @Override + public synchronized String getName() { + return name; + } + + @Override + public synchronized LifecycleState getLifecycleState() { + return lifecycleState; + } + + public String toString() { + return this.getClass().getName() + "{name:" + name + ",state:" + + lifecycleState +"}"; + } + + protected boolean isStarted() { + return getLifecycleState() == LifecycleState.START; + } + /** + * @return Exception thrown during configure() or start() + */ + protected Exception getStartException() { + return exception; + } + protected synchronized void setLifecycleState(LifecycleState lifecycleState) { + this.lifecycleState = lifecycleState; + } + protected abstract void doConfigure(Context context) throws FlumeException; + protected abstract void doStart() throws FlumeException; + protected abstract void doStop() throws FlumeException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/5b636c12/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java index d4d818a..b46dc0e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java @@ -179,7 +179,7 @@ public class HTTPSource extends AbstractSource implements getChannelProcessor().processEventBatch(events); } catch (ChannelException ex) { LOG.warn("Error appending event to channel. " - + "Channel might be full. Consider increasing the channel" + + "Channel might be full. Consider increasing the channel " + "capacity or make sure the sinks perform faster.", ex); response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Error appending event to channel. Channel might be full." http://git-wip-us.apache.org/repos/asf/flume/blob/5b636c12/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java new file mode 100644 index 0000000..02a2f0c --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source; +import static org.mockito.Mockito.*; + +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.junit.Before; +import org.junit.Test; + +public class TestAbstractPollableSource { + + private AbstractPollableSource source; + + @Before + public void setUp() { + source = spy(new AbstractPollableSource() { + @Override + protected Status doProcess() throws EventDeliveryException { + return Status.BACKOFF; + } + @Override + protected void doConfigure(Context context) throws FlumeException { + throw new FlumeException("dummy"); + } + @Override + protected void doStart() throws FlumeException { + + } + @Override + protected void doStop() throws FlumeException { + + } + }); + } + + @Test(expected = FlumeException.class) + public void testExceptionStartup() throws Exception { + source.configure(new Context()); + } + @Test(expected = EventDeliveryException.class) + public void testNotStarted() throws Exception { + source.process(); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/5b636c12/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java new file mode 100644 index 0000000..9227ef8 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source; +import static org.mockito.Mockito.*; + +import org.apache.flume.Context; +import org.apache.flume.FlumeException; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.lifecycle.LifecycleState; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestBasicSourceSemantics { + + private BasicSourceSemantics source; + private ChannelProcessor channelProcessor; + private Context context; + + @Before + public void setUp() { + context = new Context(); + channelProcessor = mock(ChannelProcessor.class); + } + public DoNothingSource spyAndConfigure(DoNothingSource source) { + source = spy(source); + source.setChannelProcessor(channelProcessor); + source.configure(context); + return source; + } + @Test + public void testDoConfigureThrowsException() throws Exception { + source = spy(new DoNothingSource() { + @Override + protected void doConfigure(Context context) throws FlumeException { + throw new FlumeException("dummy"); + } + }); + source.setChannelProcessor(channelProcessor); + try { + source.configure(context); + Assert.fail(); + } catch (FlumeException expected) { + + } + Assert.assertFalse(source.isStarted()); + Assert.assertEquals(LifecycleState.ERROR, source.getLifecycleState()); + Assert.assertNotNull(source.getStartException()); + } + @Test + public void testDoStartThrowsException() throws Exception { + source = spyAndConfigure(new DoNothingSource() { + @Override + protected void doStart() throws FlumeException { + throw new FlumeException("dummy"); + } + }); + source.start(); + Assert.assertFalse(source.isStarted()); + Assert.assertEquals(LifecycleState.ERROR, source.getLifecycleState()); + Assert.assertNotNull(source.getStartException()); + } + @Test + public void testDoStopThrowsException() throws Exception { + source = spyAndConfigure(new DoNothingSource() { + @Override + protected void doStop() throws FlumeException { + throw new FlumeException("dummy"); + } + }); + source.start(); + source.stop(); + Assert.assertFalse(source.isStarted()); + Assert.assertEquals(LifecycleState.ERROR, source.getLifecycleState()); + Assert.assertNull(source.getStartException()); + } + @Test + public void testConfigureCalledWhenStarted() throws Exception { + source = spyAndConfigure(new DoNothingSource()); + source.start(); + try { + source.configure(context); + Assert.fail(); + } catch (IllegalStateException expected) { + + } + Assert.assertTrue(source.isStarted()); + Assert.assertNull(source.getStartException()); + } + private static class DoNothingSource extends BasicSourceSemantics { + @Override + protected void doConfigure(Context context) throws FlumeException { + + } + @Override + protected void doStart() throws FlumeException { + + } + @Override + protected void doStop() throws FlumeException { + + } + } +} \ No newline at end of file
