http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java deleted file mode 100644 index f2dad6f..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node; - -import org.apache.flume.SourceRunner; -import org.apache.flume.lifecycle.LifecycleController; -import org.apache.flume.lifecycle.LifecycleException; -import org.apache.flume.lifecycle.LifecycleState; -import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager; -import org.apache.flume.source.SequenceGeneratorSource; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -public class TestFlumeNode { - - private FlumeNode node; - - @Before - public void setUp() { - node = new FlumeNode(); - - node.setName("test-node"); - node.setNodeManager(new EmptyLogicalNodeManager()); - } - - @Ignore("Fails given recent changes to configuration system") - @Test - public void testLifecycle() throws InterruptedException, LifecycleException { - node.start(); - boolean reached = LifecycleController.waitForOneOf(node, - LifecycleState.START_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.START, node.getLifecycleState()); - - node.stop(); - reached = LifecycleController.waitForOneOf(node, - LifecycleState.STOP_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState()); - } - - @Ignore("Fails given recent changes to configuration system") - @Test - public void testAddNodes() throws InterruptedException, LifecycleException { - node.start(); - boolean reached = LifecycleController.waitForOneOf(node, - LifecycleState.START_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.START, node.getLifecycleState()); - - SourceRunner n1 = SourceRunner.forSource(new SequenceGeneratorSource()); - - node.getNodeManager().add(n1); - - node.stop(); - reached = LifecycleController.waitForOneOf(node, - LifecycleState.STOP_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState()); - } - - public static class EmptyLogicalNodeManager extends - AbstractLogicalNodeManager { - - private LifecycleState lifecycleState; - - public EmptyLogicalNodeManager() { - lifecycleState = LifecycleState.IDLE; - } - - @Override - public void start() { - lifecycleState = LifecycleState.START; - } - - @Override - public void stop() { - lifecycleState = LifecycleState.STOP; - } - - @Override - public LifecycleState getLifecycleState() { - return lifecycleState; - } - - } - -}
http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java deleted file mode 100644 index f759af1..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node; - -import org.junit.Ignore; -import org.junit.Test; - -@Ignore("Causes blocking with no method for clean shutdown") -public class TestFlumeNodeApplication { - - @Test - public void testApplication() { - String[] args = new String[] {}; - - Application.main(args); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java new file mode 100644 index 0000000..eed22ee --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.File; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.flume.lifecycle.LifecycleController; +import org.apache.flume.lifecycle.LifecycleState; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.google.common.io.Files; + +public class TestPollingPropertiesFileConfigurationProvider { + + + private static final File TESTFILE = new File( + TestPollingPropertiesFileConfigurationProvider.class.getClassLoader() + .getResource("flume-conf.properties").getFile()); + + private PollingPropertiesFileConfigurationProvider provider; + private File baseDir; + private File configFile; + private EventBus eventBus; + + @Before + public void setUp() throws Exception { + + baseDir = Files.createTempDir(); + + configFile = new File(baseDir, TESTFILE.getName()); + Files.copy(TESTFILE, configFile); + + eventBus = new EventBus("test"); + provider = + new PollingPropertiesFileConfigurationProvider("host1", + configFile, eventBus, 1); + provider.start(); + LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR); + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(baseDir); + provider.stop(); + } + + @Test + public void testPolling() throws Exception { + + // let first event fire + Thread.sleep(2000L); + + final List<MaterializedConfiguration> events = Lists.newArrayList(); + + Object eventHandler = new Object() { + @Subscribe + public synchronized void handleConfigurationEvent(MaterializedConfiguration event) { + events.add(event); + } + }; + eventBus.register(eventHandler); + configFile.setLastModified(System.currentTimeMillis()); + + // now wait for second event to fire + Thread.sleep(2000L); + + Assert.assertEquals(String.valueOf(events), 1, events.size()); + + MaterializedConfiguration materializedConfiguration = events.remove(0); + + Assert.assertEquals(1, materializedConfiguration.getSourceRunners().size()); + Assert.assertEquals(1, materializedConfiguration.getSinkRunners().size()); + Assert.assertEquals(1, materializedConfiguration.getChannels().size()); + + + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java new file mode 100644 index 0000000..84a8cfd --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; +import org.apache.flume.conf.FlumeConfigurationError; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class TestPropertiesFileConfigurationProvider { + + + private static final Logger LOGGER = LoggerFactory + .getLogger(TestPropertiesFileConfigurationProvider.class); + + private static final File TESTFILE = new File( + TestPropertiesFileConfigurationProvider.class.getClassLoader() + .getResource("flume-conf.properties").getFile()); + + private PropertiesFileConfigurationProvider provider; + + @Before + public void setUp() throws Exception { + provider = new PropertiesFileConfigurationProvider("test", TESTFILE); + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testPropertyRead() throws Exception { + + FlumeConfiguration configuration = provider.getFlumeConfiguration(); + Assert.assertNotNull(configuration); + + /* + * Test the known errors in the file + */ + List<String> expected = Lists.newArrayList(); + expected.add("host5 CONFIG_ERROR"); + expected.add("host5 INVALID_PROPERTY"); + expected.add("host4 CONFIG_ERROR"); + expected.add("host4 CONFIG_ERROR"); + expected.add("host4 PROPERTY_VALUE_NULL"); + expected.add("host4 PROPERTY_VALUE_NULL"); + expected.add("host4 PROPERTY_VALUE_NULL"); + expected.add("host4 AGENT_CONFIGURATION_INVALID"); + expected.add("ch2 ATTRS_MISSING"); + expected.add("host3 CONFIG_ERROR"); + expected.add("host3 PROPERTY_VALUE_NULL"); + expected.add("host3 AGENT_CONFIGURATION_INVALID"); + expected.add("host2 PROPERTY_VALUE_NULL"); + expected.add("host2 AGENT_CONFIGURATION_INVALID"); + List<String> actual = Lists.newArrayList(); + for(FlumeConfigurationError error : configuration.getConfigurationErrors()) { + actual.add(error.getComponentName() + " " + error.getErrorType().toString()); + } + Collections.sort(expected); + Collections.sort(actual); + Assert.assertEquals(expected, actual); + + + AgentConfiguration agentConfiguration = + configuration.getConfigurationFor("host1"); + Assert.assertNotNull(agentConfiguration); + + + LOGGER.info(agentConfiguration.getPrevalidationConfig()); + LOGGER.info(agentConfiguration.getPostvalidationConfig()); + + + Set<String> sources = Sets.newHashSet("source1"); + Set<String> sinks = Sets.newHashSet("sink1"); + Set<String> channels = Sets.newHashSet("channel1"); + + Assert.assertEquals(sources, agentConfiguration.getSourceSet()); + Assert.assertEquals(sinks, agentConfiguration.getSinkSet()); + Assert.assertEquals(channels, agentConfiguration.getChannelSet()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java deleted file mode 100644 index 41e2f35..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.source; - -import org.apache.flume.EventDeliveryException; - -public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource { - - @Override - public Status process() throws EventDeliveryException { - - if (Math.round(Math.random()) == 1) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Do nothing. - } - - throw new EventDeliveryException("I'm broken!"); - } else { - return super.process(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/resources/flume-conf.properties ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/resources/flume-conf.properties b/flume-ng-node/src/test/resources/flume-conf.properties index 2b74d4c..23cace9 100644 --- a/flume-ng-node/src/test/resources/flume-conf.properties +++ b/flume-ng-node/src/test/resources/flume-conf.properties @@ -23,43 +23,22 @@ # host2, host3 etc. # -host1.sources = avroSource syslogSource -host1.channels = jdbcChannel memChannel -host1.sinks = hdfsSink +host1.sources = source1 +host1.channels = channel1 +host1.sinks = sink1 # avroSource configuration -host1.sources.avroSource.type = avro -host1.sources.avroSource.bind = 127.0.0.1 -host1.sources.avroSource.port = 11001 -host1.sources.avroSource.channels = jdbcChannel - -# syslogSource configuration -host1.sources.syslogSource.type = syslogtcp -host1.sources.syslogSource.port = 13231 -host1.sources.syslogSource.channels = jdbcChannel memChannel -host1.sources.syslogSource.selector.type = multiplexing -host1.sources.syslogSource.selector.header = my.selector.header -host1.sources.syslogSource.selector.mapping.all = jdbcChannel memChannel -host1.sources.syslogSource.selector.mapping.persist = jdbcChannel -host1.sources.syslogSource.selector.default = memChannel - -# jdbcChannel configuration -host1.channels.jdbcChannel.type = jdbc -host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver -host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb -host1.channels.jdbcChannel.jdbc.username = flume -host1.channels.jdbcChannel.jdbc.password = flume - -# memChannel configuration -host1.channels.memChannel.type = memory -host1.channels.memChannel.capacity = 10000 +host1.sources.source1.type = seq +host1.sources.source1.channels = channel1 + +# memChannel1 configuration +host1.channels.channel1.type = memory +host1.channels.channel1.capacity = 10000 + # hdfsSink configuration -host1.sinks.hdfsSink.type = hdfs -host1.sinks.hdfsSink.namenode = hdfs://localhost/ -host1.sinks.hdfsSink.batchsize = 1000 -host1.sinks.hdfsSink.runner.type = polling -host1.sinks.hdfsSink.runner.polling.interval = 60 +host1.sinks.sink1.type = null +host1.sinks.sink1.channel = channel1 # # Agent configuration for host2 - invalid because channels is not
