http://git-wip-us.apache.org/repos/asf/flume/blob/97ed09e6/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
deleted file mode 100644
index f2dad6f..0000000
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import org.apache.flume.SourceRunner;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
-import org.apache.flume.source.SequenceGeneratorSource;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class TestFlumeNode {
-
-  private FlumeNode node;
-
-  @Before
-  public void setUp() {
-    node = new FlumeNode();
-
-    node.setName("test-node");
-    node.setNodeManager(new EmptyLogicalNodeManager());
-  }
-
-  @Ignore("Fails given recent changes to configuration system")
-  @Test
-  public void testLifecycle() throws InterruptedException, LifecycleException {
-    node.start();
-    boolean reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.START_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
-
-    node.stop();
-    reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.STOP_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
-  }
-
-  @Ignore("Fails given recent changes to configuration system")
-  @Test
-  public void testAddNodes() throws InterruptedException, LifecycleException {
-    node.start();
-    boolean reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.START_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
-
-    SourceRunner n1 = SourceRunner.forSource(new SequenceGeneratorSource());
-
-    node.getNodeManager().add(n1);
-
-    node.stop();
-    reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.STOP_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
-  }
-
-  public static class EmptyLogicalNodeManager extends
-      AbstractLogicalNodeManager {
-
-    private LifecycleState lifecycleState;
-
-    public EmptyLogicalNodeManager() {
-      lifecycleState = LifecycleState.IDLE;
-    }
-
-    @Override
-    public void start() {
-      lifecycleState = LifecycleState.START;
-    }
-
-    @Override
-    public void stop() {
-      lifecycleState = LifecycleState.STOP;
-    }
-
-    @Override
-    public LifecycleState getLifecycleState() {
-      return lifecycleState;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/97ed09e6/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
deleted file mode 100644
index f759af1..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("Causes blocking with no method for clean shutdown")
-public class TestFlumeNodeApplication {
-
-  @Test
-  public void testApplication() {
-    String[] args = new String[] {};
-
-    Application.main(args);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/97ed09e6/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..eed22ee
--- /dev/null
+++ 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Files;
+
+public class TestPollingPropertiesFileConfigurationProvider  {
+
+
+  private static final File TESTFILE = new File(
+      TestPollingPropertiesFileConfigurationProvider.class.getClassLoader()
+          .getResource("flume-conf.properties").getFile());
+
+  private PollingPropertiesFileConfigurationProvider provider;
+  private File baseDir;
+  private File configFile;
+  private EventBus eventBus;
+
+  @Before
+  public void setUp() throws Exception {
+
+    baseDir = Files.createTempDir();
+
+    configFile = new File(baseDir, TESTFILE.getName());
+    Files.copy(TESTFILE, configFile);
+
+    eventBus = new EventBus("test");
+    provider =
+        new PollingPropertiesFileConfigurationProvider("host1",
+            configFile, eventBus, 1);
+    provider.start();
+    LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(baseDir);
+    provider.stop();
+  }
+
+  @Test
+  public void testPolling() throws Exception {
+
+    // let first event fire
+    Thread.sleep(2000L);
+
+    final List<MaterializedConfiguration> events = Lists.newArrayList();
+
+    Object eventHandler = new Object() {
+      @Subscribe
+      public synchronized void 
handleConfigurationEvent(MaterializedConfiguration event) {
+        events.add(event);
+      }
+    };
+    eventBus.register(eventHandler);
+    configFile.setLastModified(System.currentTimeMillis());
+
+    // now wait for second event to fire
+    Thread.sleep(2000L);
+
+    Assert.assertEquals(String.valueOf(events), 1, events.size());
+
+    MaterializedConfiguration materializedConfiguration = events.remove(0);
+
+    Assert.assertEquals(1, 
materializedConfiguration.getSourceRunners().size());
+    Assert.assertEquals(1, materializedConfiguration.getSinkRunners().size());
+    Assert.assertEquals(1, materializedConfiguration.getChannels().size());
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/97ed09e6/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..84a8cfd
--- /dev/null
+++ 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class TestPropertiesFileConfigurationProvider  {
+
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(TestPropertiesFileConfigurationProvider.class);
+
+  private static final File TESTFILE = new File(
+      TestPropertiesFileConfigurationProvider.class.getClassLoader()
+          .getResource("flume-conf.properties").getFile());
+
+  private PropertiesFileConfigurationProvider provider;
+
+  @Before
+  public void setUp() throws Exception {
+    provider = new PropertiesFileConfigurationProvider("test", TESTFILE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+
+  }
+
+  @Test
+  public void testPropertyRead() throws Exception {
+
+    FlumeConfiguration configuration = provider.getFlumeConfiguration();
+    Assert.assertNotNull(configuration);
+
+    /*
+     * Test the known errors in the file
+     */
+    List<String> expected = Lists.newArrayList();
+    expected.add("host5 CONFIG_ERROR");
+    expected.add("host5 INVALID_PROPERTY");
+    expected.add("host4 CONFIG_ERROR");
+    expected.add("host4 CONFIG_ERROR");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 AGENT_CONFIGURATION_INVALID");
+    expected.add("ch2 ATTRS_MISSING");
+    expected.add("host3 CONFIG_ERROR");
+    expected.add("host3 PROPERTY_VALUE_NULL");
+    expected.add("host3 AGENT_CONFIGURATION_INVALID");
+    expected.add("host2 PROPERTY_VALUE_NULL");
+    expected.add("host2 AGENT_CONFIGURATION_INVALID");
+    List<String> actual = Lists.newArrayList();
+    for(FlumeConfigurationError error : 
configuration.getConfigurationErrors()) {
+      actual.add(error.getComponentName() + " " + 
error.getErrorType().toString());
+    }
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(expected, actual);
+
+
+    AgentConfiguration agentConfiguration =
+        configuration.getConfigurationFor("host1");
+    Assert.assertNotNull(agentConfiguration);
+
+
+    LOGGER.info(agentConfiguration.getPrevalidationConfig());
+    LOGGER.info(agentConfiguration.getPostvalidationConfig());
+
+
+    Set<String> sources = Sets.newHashSet("source1");
+    Set<String> sinks = Sets.newHashSet("sink1");
+    Set<String> channels = Sets.newHashSet("channel1");
+
+    Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+    Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+    Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/97ed09e6/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
 
b/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
deleted file mode 100644
index 41e2f35..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.source;
-
-import org.apache.flume.EventDeliveryException;
-
-public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource {
-
-  @Override
-  public Status process() throws EventDeliveryException {
-
-    if (Math.round(Math.random()) == 1) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        // Do nothing.
-      }
-
-      throw new EventDeliveryException("I'm broken!");
-    } else {
-      return super.process();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/97ed09e6/flume-ng-node/src/test/resources/flume-conf.properties
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/resources/flume-conf.properties 
b/flume-ng-node/src/test/resources/flume-conf.properties
index 2b74d4c..23cace9 100644
--- a/flume-ng-node/src/test/resources/flume-conf.properties
+++ b/flume-ng-node/src/test/resources/flume-conf.properties
@@ -23,43 +23,22 @@
 # host2, host3 etc.
 #
 
-host1.sources = avroSource syslogSource
-host1.channels = jdbcChannel memChannel
-host1.sinks = hdfsSink
+host1.sources = source1
+host1.channels = channel1
+host1.sinks = sink1
 
 # avroSource configuration
-host1.sources.avroSource.type = avro
-host1.sources.avroSource.bind = 127.0.0.1
-host1.sources.avroSource.port = 11001
-host1.sources.avroSource.channels = jdbcChannel
-
-# syslogSource configuration
-host1.sources.syslogSource.type = syslogtcp
-host1.sources.syslogSource.port = 13231
-host1.sources.syslogSource.channels = jdbcChannel memChannel
-host1.sources.syslogSource.selector.type = multiplexing
-host1.sources.syslogSource.selector.header = my.selector.header
-host1.sources.syslogSource.selector.mapping.all = jdbcChannel memChannel
-host1.sources.syslogSource.selector.mapping.persist = jdbcChannel
-host1.sources.syslogSource.selector.default = memChannel
-
-# jdbcChannel configuration
-host1.channels.jdbcChannel.type = jdbc
-host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
-host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
-host1.channels.jdbcChannel.jdbc.username = flume
-host1.channels.jdbcChannel.jdbc.password = flume
-
-# memChannel configuration
-host1.channels.memChannel.type = memory
-host1.channels.memChannel.capacity = 10000
+host1.sources.source1.type = seq
+host1.sources.source1.channels = channel1
+
+# memChannel1 configuration
+host1.channels.channel1.type = memory
+host1.channels.channel1.capacity = 10000
+
 
 # hdfsSink configuration
-host1.sinks.hdfsSink.type = hdfs
-host1.sinks.hdfsSink.namenode = hdfs://localhost/
-host1.sinks.hdfsSink.batchsize = 1000
-host1.sinks.hdfsSink.runner.type = polling
-host1.sinks.hdfsSink.runner.polling.interval = 60
+host1.sinks.sink1.type = null
+host1.sinks.sink1.channel = channel1
 
 #
 # Agent configuration for host2 - invalid because channels is not

Reply via email to