Repository: flume Updated Branches: refs/heads/trunk 580f78134 -> e4312ad1a
FLUME-199. Remove hardcoded ports from unit tests Many unit tests use hardcoded port numbers which leads to flakiness and causes problems when running builds in parallel. This patch fixes this issue by searching for available ports instead of the hardcoded ones. This closes #124 Reviewers: Miklos Csanady, Ferenc Szabo (Andras Beni via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e4312ad1 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e4312ad1 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e4312ad1 Branch: refs/heads/trunk Commit: e4312ad1acfd28e23405e6775374948fa35d5e40 Parents: 580f781 Author: Andras Beni <[email protected]> Authored: Thu Oct 26 15:31:04 2017 -0700 Committer: Denes Arvay <[email protected]> Committed: Thu Oct 26 15:31:04 2017 -0700 ---------------------------------------------------------------------- .../TestLoadBalancingLog4jAppender.java | 73 +++++----- .../log4jappender/TestLog4jAppender.java | 11 +- .../TestLog4jAppenderWithAvro.java | 14 +- ...e-loadbalancing-backoff-log4jtest.properties | 2 +- ...flume-loadbalancing-rnd-log4jtest.properties | 2 +- .../flume-loadbalancinglog4jtest.properties | 2 +- .../flume-log4jtest-avro-generic.properties | 2 +- .../flume-log4jtest-avro-reflect.properties | 2 +- .../test/resources/flume-log4jtest.properties | 2 +- .../http/TestHTTPMetricsServer.java | 32 ++--- .../org/apache/flume/sink/TestAvroSink.java | 11 +- .../org/apache/flume/sink/TestThriftSink.java | 9 +- .../org/apache/flume/source/TestAvroSource.java | 138 +++++++------------ .../source/TestMultiportSyslogTCPSource.java | 26 ++-- .../apache/flume/source/TestNetcatSource.java | 44 +++--- .../apache/flume/source/TestThriftSource.java | 13 +- .../flume/agent/embedded/TestEmbeddedAgent.java | 11 +- .../source/avroLegacy/TestLegacyAvroSource.java | 48 +++---- .../flume-thrift-source/pom.xml | 4 +- .../thriftLegacy/TestThriftLegacySource.java | 38 ++--- .../apache/flume/source/TestNetcatSource.java | 36 ++--- .../apache/flume/api/TestThriftRpcClient.java | 11 +- .../apache/flume/sink/http/TestHttpSinkIT.java | 18 ++- .../apache/flume/sink/kafka/util/TestUtil.java | 49 +++---- .../source/kafka/KafkaSourceEmbeddedKafka.java | 13 +- .../apache/flume/test/agent/TestRpcClient.java | 7 +- .../TestRpcClientCommunicationFailure.java | 6 +- .../apache/flume/test/util/StagedInstall.java | 14 +- .../test/resources/rpc-client-test.properties | 2 +- 29 files changed, 312 insertions(+), 328 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java index ec5d6df..0546c23 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java @@ -18,9 +18,10 @@ */ package org.apache.flume.clients.log4jappender; -import java.io.File; -import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; +import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -64,6 +66,25 @@ public class TestLoadBalancingLog4jAppender { private Logger fixture; private boolean slowDown = false; + private static List<Integer> getFreePorts(int numberOfPorts) throws IOException { + List<Integer> ports = new ArrayList<>(numberOfPorts); + for (int index = 0; index < numberOfPorts; ++index) { + try (ServerSocket socket = new ServerSocket(0)) { + ports.add(socket.getLocalPort()); + } + } + return ports; + } + + private static String toHostList(List<Integer> ports) { + List<String> addresses = new ArrayList<String>(ports.size()); + for (Integer port : ports) { + addresses.add("localhost:" + port); + } + String hostList = StringUtils.join(addresses, " "); + return hostList; + } + @Before public void initiate() throws InterruptedException { ch = new MemoryChannel(); @@ -92,10 +113,8 @@ public class TestLoadBalancingLog4jAppender { public void testLog4jAppenderRoundRobin() throws IOException { int numberOfMsgs = 1000; int expectedPerSource = 500; - File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancinglog4jtest.properties").getFile()); - startSources(TESTFILE, false, new int[] { 25430, 25431 }); + String propertiesFile = "flume-loadbalancinglog4jtest.properties"; + startSources(propertiesFile, false, getFreePorts(2)); sendAndAssertMessages(numberOfMsgs); @@ -107,12 +126,8 @@ public class TestLoadBalancingLog4jAppender { @Test public void testLog4jAppenderRandom() throws IOException { int numberOfMsgs = 1000; - File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancing-rnd-log4jtest.properties").getFile()); - startSources(TESTFILE, false, new int[] { 25430, 25431, 25432, 25433, - 25434, - 25435, 25436, 25437, 25438, 25439 }); + String propertiesFile = "flume-loadbalancing-rnd-log4jtest.properties"; + startSources(propertiesFile, false, getFreePorts(10)); sendAndAssertMessages(numberOfMsgs); @@ -129,11 +144,8 @@ public class TestLoadBalancingLog4jAppender { @Test public void testRandomBackoff() throws Exception { - File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancing-backoff-log4jtest.properties") - .getFile()); - startSources(TESTFILE, false, new int[] { 25430, 25431, 25432 }); + String propertiesFile = "flume-loadbalancing-backoff-log4jtest.properties"; + startSources(propertiesFile, false, getFreePorts(3)); sources.get(0).setFail(); sources.get(2).setFail(); @@ -163,11 +175,8 @@ public class TestLoadBalancingLog4jAppender { @Test public void testRandomBackoffUnsafeMode() throws Exception { - File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancing-backoff-log4jtest.properties") - .getFile()); - startSources(TESTFILE, true, new int[]{25430, 25431, 25432}); + String propertiesFile = "flume-loadbalancing-backoff-log4jtest.properties"; + startSources(propertiesFile, true, getFreePorts(3)); sources.get(0).setFail(); sources.get(1).setFail(); @@ -178,15 +187,12 @@ public class TestLoadBalancingLog4jAppender { @Test (expected = EventDeliveryException.class) public void testTimeout() throws Throwable { - File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancinglog4jtest.properties") - .getFile()); + String propertiesFile = "flume-loadbalancinglog4jtest.properties"; ch = new TestLog4jAppender.SlowMemoryChannel(2000); configureChannel(); slowDown = true; - startSources(TESTFILE, false, new int[]{25430, 25431, 25432}); + startSources(propertiesFile, false, getFreePorts(3)); int level = 20000; String msg = "This is log message number" + String.valueOf(level); try { @@ -194,16 +200,12 @@ public class TestLoadBalancingLog4jAppender { } catch (FlumeException ex) { throw ex.getCause(); } - } @Test(expected = EventDeliveryException.class) public void testRandomBackoffNotUnsafeMode() throws Throwable { - File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancing-backoff-log4jtest.properties") - .getFile()); - startSources(TESTFILE, false, new int[]{25430, 25431, 25432}); + String propertiesFile = "flume-loadbalancing-backoff-log4jtest.properties"; + startSources(propertiesFile, false, getFreePorts(3)); sources.get(0).setFail(); sources.get(1).setFail(); @@ -272,7 +274,7 @@ public class TestLoadBalancingLog4jAppender { } - private void startSources(File log4jProps, boolean unsafeMode, int... ports) + private void startSources(String log4jProps, boolean unsafeMode, List<Integer> ports) throws IOException { for (int port : ports) { CountingAvroSource source = new CountingAvroSource(port); @@ -294,9 +296,10 @@ public class TestLoadBalancingLog4jAppender { // log4j setup is completed before the @Before calls also. // This will cause the test to fail even before it starts! - FileReader reader = new FileReader(log4jProps); + Reader reader = new InputStreamReader(getClass().getResourceAsStream("/" + log4jProps)); Properties props = new Properties(); props.load(reader); + props.setProperty("log4j.appender.out2.Hosts", toHostList(ports)); props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(unsafeMode)); if (slowDown) { http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java index 25698c5..3f8ba37 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java @@ -21,6 +21,8 @@ package org.apache.flume.clients.log4jappender; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -58,9 +60,15 @@ public class TestLog4jAppender { private Channel ch; private Properties props; + private static int getFreePort() throws Exception { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + @Before public void initiate() throws Exception { - int port = 25430; + int port = getFreePort(); source = Mockito.spy(new AvroSource()); ch = new MemoryChannel(); Configurables.configure(ch, new Context()); @@ -76,6 +84,7 @@ public class TestLog4jAppender { FileReader reader = new FileReader(TESTFILE); props = new Properties(); props.load(reader); + props.put("log4j.appender.out2.Port", String.valueOf(port)); reader.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java index 7c2a964..9ccebc8 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java @@ -23,6 +23,7 @@ import com.google.common.io.Resources; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.net.ServerSocket; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; @@ -59,6 +60,13 @@ public class TestLog4jAppenderWithAvro { private AvroSource source; private Channel ch; private Properties props; + private int port; + + private static int getFreePort() throws Exception { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } @Before public void setUp() throws Exception { @@ -66,7 +74,7 @@ public class TestLog4jAppenderWithAvro { Files.copy(Resources.newInputStreamSupplier(schemaUrl), new File("/tmp/myrecord.avsc")); - int port = 25430; + port = getFreePort(); source = new AvroSource(); ch = new MemoryChannel(); Configurables.configure(ch, new Context()); @@ -99,6 +107,7 @@ public class TestLog4jAppenderWithAvro { @Test public void testAvroGeneric() throws IOException { loadProperties("flume-log4jtest-avro-generic.properties"); + props.put("log4j.appender.out2.Port", String.valueOf(port)); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppenderWithAvro.class); String msg = "This is log message number " + String.valueOf(0); @@ -136,6 +145,7 @@ public class TestLog4jAppenderWithAvro { @Test public void testAvroReflect() throws IOException { loadProperties("flume-log4jtest-avro-reflect.properties"); + props.put("log4j.appender.out2.Port", String.valueOf(port)); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppenderWithAvro.class); String msg = "This is log message number " + String.valueOf(0); @@ -173,6 +183,7 @@ public class TestLog4jAppenderWithAvro { @Test public void testDifferentEventTypesInBatchWithAvroReflect() throws IOException { loadProperties("flume-log4jtest-avro-reflect.properties"); + props.put("log4j.appender.out2.Port", String.valueOf(port)); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(getClass()); List<Object> events = Arrays.asList("string", new AppEvent("appEvent")); @@ -203,6 +214,7 @@ public class TestLog4jAppenderWithAvro { @Test public void testDifferentEventTypesInBatchWithAvroGeneric() throws IOException { loadProperties("flume-log4jtest-avro-generic.properties"); + props.put("log4j.appender.out2.Port", String.valueOf(port)); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(getClass()); String msg = "Avro log message"; http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties index 6e8235e..6f11e17 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender -log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 +#log4j.appender.out2.Hosts = Set from java source log4j.appender.out2.Selector = ROUND_ROBIN log4j.appender.out2.MaxBackoff = 30000 log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties index fd43d19..753feb6 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties @@ -15,6 +15,6 @@ # specific language governing permissions and limitations # under the License. log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender -log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 localhost:25433 localhost:25434 localhost:25435 localhost:25436 localhost:25437 localhost:25438 localhost:25439 +#log4j.appender.out2.Hosts = Set from java source log4j.appender.out2.Selector = RANDOM log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties index 618e504..cfa4e11 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties @@ -15,5 +15,5 @@ # specific language governing permissions and limitations # under the License. log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender -log4j.appender.out2.Hosts = localhost:25430 localhost:25431 +#log4j.appender.out2.Hosts = Set from java source log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties index ffdab8b..92d7d16 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender -log4j.appender.out2.Port = 25430 +#log4j.appender.out2.Port = Set from java source log4j.appender.out2.Hostname = localhost log4j.appender.out2.AvroSchemaUrl = file:///tmp/myrecord.avsc log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties index b50ffcc..06a0c68 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender -log4j.appender.out2.Port = 25430 +#log4j.appender.out2.Port = Set from java source log4j.appender.out2.Hostname = localhost log4j.appender.out2.AvroReflectionEnabled = true log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest.properties ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest.properties index 6575c0e..97dcab5 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest.properties +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest.properties @@ -15,6 +15,6 @@ # specific language governing permissions and limitations # under the License. log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender -log4j.appender.out2.Port = 25430 +#log4j.appender.out2.Port = Set from java source log4j.appender.out2.Hostname = localhost log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java index bf0bf0e..e39b0bd 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java @@ -38,12 +38,10 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.lang.reflect.Type; import java.net.HttpURLConnection; +import java.net.ServerSocket; import java.net.URL; import java.util.Map; -/** - * - */ public class TestHTTPMetricsServer { Channel memChannel = new MemoryChannel(); @@ -51,6 +49,12 @@ public class TestHTTPMetricsServer { Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType(); Gson gson = new Gson(); + private static int getFreePort() throws Exception { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + @Test public void testJSON() throws Exception { memChannel.setName("memChannel"); @@ -87,10 +91,7 @@ public class TestHTTPMetricsServer { txn2.commit(); txn2.close(); - testWithPort(5467); - testWithPort(33434); - testWithPort(44343); - testWithPort(0); + testWithPort(getFreePort()); memChannel.stop(); pmemChannel.stop(); } @@ -98,11 +99,7 @@ public class TestHTTPMetricsServer { private void testWithPort(int port) throws Exception { MonitorService srv = new HTTPMetricsServer(); Context context = new Context(); - if (port > 1024) { - context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port)); - } else { - port = HTTPMetricsServer.DEFAULT_PORT; - } + context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port)); srv.configure(context); srv.start(); Thread.sleep(1000); @@ -126,26 +123,21 @@ public class TestHTTPMetricsServer { Assert.assertNotNull(pmemBean); JMXTestUtils.checkChannelCounterParams(pmemBean); srv.stop(); - System.out.println(String.valueOf(port) + "test success!"); } @Test public void testTrace() throws Exception { - doTestForbiddenMethods(4543,"TRACE"); + doTestForbiddenMethods(getFreePort(),"TRACE"); } @Test public void testOptions() throws Exception { - doTestForbiddenMethods(4432,"OPTIONS"); + doTestForbiddenMethods(getFreePort(),"OPTIONS"); } public void doTestForbiddenMethods(int port, String method) throws Exception { MonitorService srv = new HTTPMetricsServer(); Context context = new Context(); - if (port > 1024) { - context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port)); - } else { - port = HTTPMetricsServer.DEFAULT_PORT; - } + context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port)); srv.configure(context); srv.start(); Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java index 0f3f9ec..fdb1772 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java @@ -59,6 +59,7 @@ import javax.net.ssl.SSLEngine; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.nio.charset.Charset; import java.security.KeyStore; import java.security.Security; @@ -72,7 +73,15 @@ public class TestAvroSink { private static final Logger logger = LoggerFactory .getLogger(TestAvroSink.class); private static final String hostname = "127.0.0.1"; - private static final Integer port = 41414; + private static final Integer port; + + static { + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } catch (IOException e) { + throw new AssertionError("Cannot find free port", e); + } + } private AvroSink sink; private Channel channel; http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java index 22dcf98..687c635 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java @@ -40,8 +40,9 @@ import org.junit.Test; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; + +import java.net.ServerSocket; import java.nio.charset.Charset; -import java.util.Random; import java.util.concurrent.atomic.AtomicLong; public class TestThriftSink { @@ -51,14 +52,14 @@ public class TestThriftSink { private String hostname; private int port; - private final Random random = new Random(); - @Before public void setUp() throws Exception { sink = new ThriftSink(); channel = new MemoryChannel(); hostname = "0.0.0.0"; - port = random.nextInt(50000) + 1024; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } Context context = new Context(); context.put("hostname", hostname); http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index e7e2fab..fcfc72c 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -22,6 +22,7 @@ package org.apache.flume.source; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.cert.X509Certificate; @@ -94,26 +95,16 @@ public class TestAvroSource { } @Test - public void testLifecycle() throws InterruptedException { - boolean bound = false; + public void testLifecycle() throws InterruptedException, IOException { - for (int i = 0; i < 100 && !bound; i++) { + Context context = new Context(); - Context context = new Context(); + context.put("port", String.valueOf(selectedPort = getFreePort())); + context.put("bind", "0.0.0.0"); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - // Invalid configuration may throw a FlumeException which has to be expected in the callers - Configurables.configure(source, context); - try { - source.start(); - bound = true; - } catch (FlumeException e) { - /* - * NB: This assume the failure is to bind. - */ - } - } + Configurables.configure(source, context); + + source.start(); Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( @@ -232,29 +223,21 @@ public class TestAvroSource { private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, int compressionLevel) throws InterruptedException, IOException { - boolean bound = false; - for (int i = 0; i < 100 && !bound; i++) { - Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - context.put("threads", "50"); - if (serverEnableCompression) { - context.put("compression-type", "deflate"); - } else { - context.put("compression-type", "none"); - } - Configurables.configure(source, context); - try { - source.start(); - bound = true; - } catch (FlumeException e) { - /* - * NB: This assume the failure is to bind. - */ - } + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = getFreePort())); + context.put("bind", "0.0.0.0"); + context.put("threads", "50"); + if (serverEnableCompression) { + context.put("compression-type", "deflate"); + } else { + context.put("compression-type", "none"); } + Configurables.configure(source, context); + + source.start(); + Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( source, LifecycleState.START_OR_ERROR)); @@ -307,6 +290,12 @@ public class TestAvroSource { source.getLifecycleState()); } + private static int getFreePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + private static class CompressionChannelFactory extends NioClientSocketChannelFactory { private int compressionLevel; @@ -332,28 +321,17 @@ public class TestAvroSource { @Test public void testSslRequest() throws InterruptedException, IOException { - boolean bound = false; - for (int i = 0; i < 10 && !bound; i++) { - Context context = new Context(); + Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - context.put("ssl", "true"); - context.put("keystore", "src/test/resources/server.p12"); - context.put("keystore-password", "password"); - context.put("keystore-type", "PKCS12"); - Configurables.configure(source, context); - try { - source.start(); - bound = true; - } catch (FlumeException e) { - /* - * NB: This assume the failure is to bind. - */ - Thread.sleep(100); - } - } + context.put("port", String.valueOf(selectedPort = getFreePort())); + context.put("bind", "0.0.0.0"); + context.put("ssl", "true"); + context.put("keystore", "src/test/resources/server.p12"); + context.put("keystore-password", "password"); + context.put("keystore-type", "PKCS12"); + Configurables.configure(source, context); + source.start(); Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( @@ -523,37 +501,23 @@ public class TestAvroSource { public void doIpFilterTest(InetAddress dest, String ruleDefinition, boolean eventShouldBeAllowed, boolean testWithSSL) throws InterruptedException, IOException { - boolean bound = false; - - for (int i = 0; i < 100 && !bound; i++) { - - Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - context.put("ipFilter", "true"); - if (ruleDefinition != null) { - context.put("ipFilterRules", ruleDefinition); - } - if (testWithSSL) { - logger.info("Client testWithSSL" + testWithSSL); - context.put("ssl", "true"); - context.put("keystore", "src/test/resources/server.p12"); - context.put("keystore-password", "password"); - context.put("keystore-type", "PKCS12"); - } - // Invalid configuration may result in a FlumeException - Configurables.configure(source, context); - - try { - source.start(); - bound = true; - } catch (FlumeException e) { - /* - * NB: This assume the failure is to bind. - */ - Thread.sleep(100); - } + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = getFreePort())); + context.put("bind", "0.0.0.0"); + context.put("ipFilter", "true"); + if (ruleDefinition != null) { + context.put("ipFilterRules", ruleDefinition); } + if (testWithSSL) { + logger.info("Client testWithSSL" + testWithSSL); + context.put("ssl", "true"); + context.put("keystore", "src/test/resources/server.p12"); + context.put("keystore-password", "password"); + context.put("keystore-type", "PKCS12"); + } + // Invalid configuration may result in a FlumeException + Configurables.configure(source, context); + source.start(); Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java index 56c7881..9a6c5f4 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.UnknownHostException; @@ -56,17 +57,17 @@ import org.apache.mina.transport.socket.nio.NioSession; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.mockito.Mockito.*; public class TestMultiportSyslogTCPSource { - private static final Logger logger = - LoggerFactory.getLogger(TestMultiportSyslogTCPSource.class); + private static final int getFreePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } - private static final int BASE_TEST_SYSLOG_PORT = 14455; private final DateTime time = new DateTime(); private final String stamp1 = time.toString(); private final String host1 = "localhost.localdomain"; @@ -105,9 +106,18 @@ public class TestMultiportSyslogTCPSource { source.setChannelProcessor(new ChannelProcessor(rcs)); Context context = new Context(); + + List<Integer> portList = new ArrayList<>(1000); + while (portList.size() < 1000) { + int port = getFreePort(); + if (!portList.contains(port)) { + portList.add(port); + } + } + StringBuilder ports = new StringBuilder(); for (int i = 0; i < 1000; i++) { - ports.append(String.valueOf(BASE_TEST_SYSLOG_PORT + i)).append(" "); + ports.append(String.valueOf(portList.get(i))).append(" "); } context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS, ports.toString().trim()); @@ -117,7 +127,7 @@ public class TestMultiportSyslogTCPSource { Socket syslogSocket; for (int i = 0; i < 1000 ; i++) { syslogSocket = new Socket( - InetAddress.getLocalHost(), BASE_TEST_SYSLOG_PORT + i); + InetAddress.getLocalHost(), portList.get(i)); syslogSocket.getOutputStream().write(getEvent(i)); syslogSocket.close(); } @@ -161,7 +171,7 @@ public class TestMultiportSyslogTCPSource { Assert.assertEquals(host1, host2); if (port != null) { - int num = port - BASE_TEST_SYSLOG_PORT; + int num = portList.indexOf(port); Assert.assertEquals(data1 + " " + String.valueOf(num), new String(e.getBody())); } http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java index c1205c7..7db8983 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -33,7 +33,6 @@ import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; -import org.jboss.netty.channel.ChannelException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; +import java.net.ServerSocket; import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; @@ -55,6 +55,14 @@ public class TestNetcatSource { private static final Logger logger = LoggerFactory.getLogger(TestAvroSource.class); + + private static int getFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new AssertionError("Can not find free port.", e); + } + } /** * Five first sentences of the Fables "The Crow and the Fox" * written by Jean de La Fontaine, French poet. @@ -341,30 +349,18 @@ public class TestNetcatSource { private void startSource(String encoding, String ack, String batchSize, String maxLineLength) throws InterruptedException { - boolean bound = false; - - for (int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 10500 + i)); - context.put("bind", "0.0.0.0"); - context.put("ack-every-event", ack); - context.put("encoding", encoding); - context.put("batch-size", batchSize); - context.put("max-line-length", maxLineLength); - - Configurables.configure(source, context); - - source.start(); - bound = true; - } catch (ChannelException e) { - /* - * NB: This assume we're using the Netty server under the hood and the - * failure is to bind. Yucky. - */ - } - } + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = getFreePort())); + context.put("bind", "0.0.0.0"); + context.put("ack-every-event", ack); + context.put("encoding", encoding); + context.put("batch-size", batchSize); + context.put("max-line-length", maxLineLength); + + Configurables.configure(source, context); + + source.start(); Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(source, LifecycleState.START_OR_ERROR)); Assert.assertEquals("Server is started", LifecycleState.START, http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java index cdaefaf..f7c6361 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java @@ -43,12 +43,14 @@ import org.junit.Test; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.KeyManagerFactory; + +import java.io.IOException; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,13 +60,14 @@ public class TestThriftSource { private ThriftSource source; private MemoryChannel channel; private RpcClient client; - private final Random random = new Random(); private final Properties props = new Properties(); private int port; @Before - public void setUp() { - port = random.nextInt(50000) + 1024; + public void setUp() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } props.clear(); props.setProperty("hosts", "h1"); props.setProperty("hosts.h1", "0.0.0.0:" + String.valueOf(port)); @@ -252,7 +255,7 @@ public class TestThriftSource { context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); Configurables.configure(source, context); source.start(); - ExecutorCompletionService<Void> completionService = new ExecutorCompletionService(submitter); + ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(submitter); for (int i = 0; i < 30; i++) { completionService.submit(new SubmitHelper(i), null); } http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java index 032a4f8..5bb7cf1 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java @@ -225,17 +225,8 @@ public class TestEmbeddedAgent { } private static int findFreePort() throws IOException { - ServerSocket socket = null; - try { - socket = new ServerSocket(0); + try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort(); - } finally { - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - } - } } } } http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java b/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java index 610aa64..211c22b 100644 --- a/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java +++ b/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java @@ -36,7 +36,6 @@ import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; -import org.jboss.netty.channel.ChannelException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -44,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.ServerSocket; import java.net.URL; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -61,7 +61,7 @@ public class TestLegacyAvroSource { private Channel channel; @Before - public void setUp() { + public void setUp() throws Exception { source = new AvroLegacySource(); channel = new MemoryChannel(); @@ -74,27 +74,23 @@ public class TestLegacyAvroSource { rcs.setChannels(channels); source.setChannelProcessor(new ChannelProcessor(rcs)); + + try (ServerSocket socket = new ServerSocket(0)) { + selectedPort = socket.getLocalPort(); + } + } @Test public void testLifecycle() throws InterruptedException { - boolean bound = false; + Context context = new Context(); - for (int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); + context.put("port", String.valueOf(selectedPort)); + context.put("host", "0.0.0.0"); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("host", "0.0.0.0"); + Configurables.configure(source, context); - Configurables.configure(source, context); - - source.start(); - bound = true; - } catch (ChannelException e) { - // Assume port in use, try another one - } - } + source.start(); Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( @@ -111,24 +107,14 @@ public class TestLegacyAvroSource { @Test public void testRequest() throws InterruptedException, IOException { - boolean bound = false; - int i; - - for (i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); + Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("host", "0.0.0.0"); + context.put("port", String.valueOf(selectedPort)); + context.put("host", "0.0.0.0"); - Configurables.configure(source, context); + Configurables.configure(source, context); - source.start(); - bound = true; - } catch (ChannelException e) { - // Assume port in use, try another one - } - } + source.start(); Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-legacy-sources/flume-thrift-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-thrift-source/pom.xml b/flume-ng-legacy-sources/flume-thrift-source/pom.xml index 91cd562..194e41e 100644 --- a/flume-ng-legacy-sources/flume-thrift-source/pom.xml +++ b/flume-ng-legacy-sources/flume-thrift-source/pom.xml @@ -108,8 +108,8 @@ limitations under the License. <artifactId>maven-compiler-plugin</artifactId> <version>${mvn-compiler-plugin.version}</version> <configuration> - <source>1.6</source> - <target>1.6</target> + <source>1.7</source> + <target>1.7</target> <excludes> <exclude>**/generated-sources/**</exclude> </excludes> http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java b/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java index f228dde..56636b1 100644 --- a/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java +++ b/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java @@ -26,7 +26,6 @@ import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; -import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -43,23 +42,17 @@ import org.apache.thrift.transport.TTransportException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.ServerSocket; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -//EventStatus.java Priority.java ThriftFlumeEvent.java ThriftFlumeEventServer.java - public class TestThriftLegacySource { - private static final Logger logger = LoggerFactory - .getLogger(ThriftLegacySource.class); - private int selectedPort; private ThriftLegacySource source; private Channel channel; @@ -91,7 +84,7 @@ public class TestThriftLegacySource { } @Before - public void setUp() { + public void setUp() throws Exception { source = new ThriftLegacySource(); channel = new MemoryChannel(); @@ -104,26 +97,21 @@ public class TestThriftLegacySource { rcs.setChannels(channels); source.setChannelProcessor(new ChannelProcessor(rcs)); + + try (ServerSocket socket = new ServerSocket(0)) { + selectedPort = socket.getLocalPort(); + } } private void bind() throws InterruptedException { - boolean bound = false; + Context context = new Context(); - for (int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); - - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("host", "0.0.0.0"); + context.put("port", String.valueOf(selectedPort)); + context.put("host", "0.0.0.0"); - Configurables.configure(source, context); + Configurables.configure(source, context); - source.start(); - bound = true; - } catch (FlumeException e) { - // Assume port in use, try another one - } - } + source.start(); Assert .assertTrue("Reached start or error", LifecycleController.waitForOneOf( @@ -150,7 +138,7 @@ public class TestThriftLegacySource { public void testRequest() throws InterruptedException, IOException { bind(); - Map flumeMap = new HashMap<CharSequence, ByteBuffer>(); + Map<String, ByteBuffer> flumeMap = new HashMap<>(); ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent( 1, Priority.INFO, ByteBuffer.wrap("foo".getBytes()), 0, "fooHost", flumeMap); @@ -175,7 +163,7 @@ public class TestThriftLegacySource { public void testHeaders() throws InterruptedException, IOException { bind(); - Map flumeHeaders = new HashMap<CharSequence, ByteBuffer>(); + Map<String, ByteBuffer> flumeHeaders = new HashMap<>(); flumeHeaders.put("hello", ByteBuffer.wrap("world".getBytes("UTF-8"))); ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent( 1, Priority.INFO, ByteBuffer.wrap("foo".getBytes()), http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java index a597a31..7da1819 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -26,7 +26,6 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.EventDrivenSource; -import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -46,6 +45,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.Writer; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.nio.channels.Channels; import java.nio.channels.SocketChannel; import java.util.Arrays; @@ -70,11 +70,19 @@ public class TestNetcatSource { } @Parameters - public static Collection data() { + public static Collection<?> data() { Object[][] data = new Object[][] { { true }, { false } }; return Arrays.asList(data); } + private static int getFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new AssertionError("Can not open socket", e); + } + } + @Before public void setUp() { logger.info("Running setup"); @@ -96,24 +104,18 @@ public class TestNetcatSource { public void testLifecycle() throws InterruptedException, LifecycleException, EventDeliveryException { + final int port = getFreePort(); + ExecutorService executor = Executors.newFixedThreadPool(3); - boolean bound = false; - for (int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); - context.put("bind", "0.0.0.0"); - context.put("port", "41414"); - context.put("ack-every-event", String.valueOf(ackEveryEvent)); + Context context = new Context(); + context.put("bind", "0.0.0.0"); + context.put("port", String.valueOf(port)); + context.put("ack-every-event", String.valueOf(ackEveryEvent)); - Configurables.configure(source, context); + Configurables.configure(source, context); - source.start(); - bound = true; - } catch (FlumeException e) { - // assume port in use, try another one - } - } + source.start(); Runnable clientRequestRunnable = new Runnable() { @@ -121,7 +123,7 @@ public class TestNetcatSource { public void run() { try { SocketChannel clientChannel = SocketChannel - .open(new InetSocketAddress(41414)); + .open(new InetSocketAddress(port)); Writer writer = Channels.newWriter(clientChannel, "utf-8"); BufferedReader reader = new BufferedReader( http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java index b03fc8d..2eee0ef 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java @@ -26,13 +26,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -43,13 +43,14 @@ public class TestThriftRpcClient { private final Properties props = new Properties(); ThriftRpcClient client; ThriftTestingSource src; - private final Random random = new Random(); int port; @Before public void setUp() throws Exception { props.setProperty("hosts", "h1"); - port = random.nextInt(40000) + 1024; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift"); props.setProperty("hosts.h1", "0.0.0.0:" + String.valueOf(port)); props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10"); @@ -147,7 +148,7 @@ public class TestThriftRpcClient { try { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR.name(), port, ThriftRpcClient.COMPACT_PROTOCOL); - client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + ".0", port); + client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0.0", port); insertEvents(client, 2); //2 events } catch (EventDeliveryException ex) { Assert.assertEquals("Failed to send event. ", ex.getMessage()); @@ -170,7 +171,7 @@ public class TestThriftRpcClient { public void testMultipleThreads() throws Throwable { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port, ThriftRpcClient.COMPACT_PROTOCOL); - client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + ".0", port, 10); + client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0.0", port, 10); int threadCount = 100; ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount); ArrayList<Future<?>> futures = new ArrayList<Future<?>>(threadCount); http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java index f4fde57..9e2c356 100644 --- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java +++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSinkIT.java @@ -37,6 +37,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; +import java.io.IOException; +import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -60,11 +62,21 @@ public class TestHttpSinkIT { private HttpSink httpSink; + private static int findFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new AssertionError("Can not find free port.", e); + } + } + + private final int port = findFreePort(); + @Before public void setupSink() { if (httpSink == null) { Context httpSinkContext = new Context(); - httpSinkContext.put("endpoint", "http://localhost:8080/endpoint"); + httpSinkContext.put("endpoint", "http://localhost:" + port + "/endpoint"); httpSinkContext.put("requestTimeout", "2000"); httpSinkContext.put("connectTimeout", "1500"); httpSinkContext.put("acceptHeader", "application/json"); @@ -91,11 +103,11 @@ public class TestHttpSinkIT { @After public void waitForShutdown() throws InterruptedException { httpSink.stop(); - new CountDownLatch(1).await(500, TimeUnit.MILLISECONDS); + Thread.sleep(500); } @Rule - public WireMockRule service = new WireMockRule(wireMockConfig().port(8080)); + public WireMockRule service = new WireMockRule(wireMockConfig().port(port)); @Test public void ensureSuccessfulMessageDelivery() throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java index 6405d6c..71ed99e 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java @@ -22,8 +22,10 @@ import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.BindException; import java.net.InetAddress; +import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.List; import java.util.Properties; @@ -37,7 +39,6 @@ public class TestUtil { private static final Logger logger = LoggerFactory.getLogger(TestUtil.class); private static TestUtil instance = new TestUtil(); - private Random randPortGen = new Random(System.currentTimeMillis()); private KafkaLocal kafkaServer; private KafkaConsumer kafkaConsumer; private String hostname = "localhost"; @@ -72,19 +73,12 @@ public class TestUtil { zkProperties.load(Class.class.getResourceAsStream( "/zookeeper.properties")); - ZooKeeperLocal zookeeper; - while (true) { - //start local Zookeeper - try { - zkLocalPort = getNextPort(); - // override the Zookeeper client port with the generated one. - zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort)); - zookeeper = new ZooKeeperLocal(zkProperties); - break; - } catch (BindException bindEx) { - // bind exception. port is already in use. Try a different port. - } - } + //start local Zookeeper + zkLocalPort = getNextPort(); + // override the Zookeeper client port with the generated one. + zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort)); + new ZooKeeperLocal(zkProperties); + logger.info("ZooKeeper instance is successfully started on port " + zkLocalPort); @@ -92,20 +86,12 @@ public class TestUtil { "/kafka-server.properties")); // override the Zookeeper url. kafkaProperties.setProperty("zookeeper.connect", getZkUrl()); - while (true) { - kafkaLocalPort = getNextPort(); - // override the Kafka server port - kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort)); - kafkaServer = new KafkaLocal(kafkaProperties); - try { - kafkaServer.start(); - break; - } catch (BindException bindEx) { - // let's try another port. - } - } - logger.info("Kafka Server is successfully started on port " + - kafkaLocalPort); + kafkaLocalPort = getNextPort(); + // override the Kafka server port + kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort)); + kafkaServer = new KafkaLocal(kafkaProperties); + kafkaServer.start(); + logger.info("Kafka Server is successfully started on port " + kafkaLocalPort); return true; } catch (Exception e) { @@ -160,9 +146,10 @@ public class TestUtil { logger.info("Completed the tearDown phase."); } - private synchronized int getNextPort() { - // generate a random port number between 49152 and 65535 - return randPortGen.nextInt(65535 - 49152) + 49152; + private synchronized int getNextPort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } } public String getZkUrl() { http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 53bd65c..1186f6d 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.net.ServerSocket; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -41,8 +42,16 @@ public class KafkaSourceEmbeddedKafka { KafkaServerStartable kafkaServer; KafkaSourceEmbeddedZookeeper zookeeper; - int zkPort = 21818; // none-standard - int serverPort = 18922; + private static int findFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new AssertionError("Can not find free port.", e); + } + } + + private int zkPort = findFreePort(); // none-standard + private int serverPort = findFreePort(); KafkaProducer<String, byte[]> producer; File dir; http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java index 643795c..3492d7a 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java @@ -30,10 +30,11 @@ public class TestRpcClient { public static final String CONFIG_FILE_PRCCLIENT_TEST = "rpc-client-test.properties"; + private int port; @Before public void setUp() throws Exception { - StagedInstall.getInstance().startAgent( + port = StagedInstall.getInstance().startAgent( "rpccagent", CONFIG_FILE_PRCCLIENT_TEST); } @@ -44,8 +45,8 @@ public class TestRpcClient { @Test public void testRpcClient() throws Exception { - StagedInstall.waitUntilPortOpens("localhost", 12121, 20000); - RpcClient client = RpcClientFactory.getDefaultInstance("localhost", 12121); + StagedInstall.waitUntilPortOpens("localhost", port, 20000); + RpcClient client = RpcClientFactory.getDefaultInstance("localhost", port); String[] text = {"foo", "bar", "xyz", "abc"}; for (String str : text) { client.append(EventBuilder.withBody(str.getBytes())); http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java index efb6457..992d3be 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java @@ -36,11 +36,11 @@ public class TestRpcClientCommunicationFailure { public void testFailure() throws Exception { try { - StagedInstall.getInstance().startAgent( + int port = StagedInstall.getInstance().startAgent( "rpccagent", CONFIG_FILE_PRCCLIENT_TEST); - StagedInstall.waitUntilPortOpens("localhost", 12121, 20000); + StagedInstall.waitUntilPortOpens("localhost", port, 20000); RpcClient client = RpcClientFactory.getDefaultInstance( - "localhost", 12121); + "localhost", port); String[] text = {"foo", "bar", "xyz", "abc"}; for (String str : text) { client.append(EventBuilder.withBody(str.getBytes())); http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java index 51194b6..ce586b8 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java @@ -33,6 +33,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.ServerSocket; import java.net.Socket; import java.util.List; import java.util.Map; @@ -78,6 +79,12 @@ public class StagedInstall { return INSTANCE; } + public static int findFreePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + public synchronized boolean isRunning() { return process != null; } @@ -100,16 +107,17 @@ public class StagedInstall { Thread.sleep(3000); // sleep for 3s to let system shutdown } - public synchronized void startAgent(String name, String configResource) + public synchronized int startAgent(String name, String configResource) throws Exception { if (process != null) { throw new Exception("A process is already running"); } - + int port = findFreePort(); Properties props = new Properties(); props.load(ClassLoader.getSystemResourceAsStream(configResource)); - + props.put("rpccagent.sources.src1.port", String.valueOf(port)); startAgent(name, props); + return port; } public synchronized void startAgent(String name, Properties properties) http://git-wip-us.apache.org/repos/asf/flume/blob/e4312ad1/flume-ng-tests/src/test/resources/rpc-client-test.properties ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/resources/rpc-client-test.properties b/flume-ng-tests/src/test/resources/rpc-client-test.properties index 560f4c1..2f9117d 100644 --- a/flume-ng-tests/src/test/resources/rpc-client-test.properties +++ b/flume-ng-tests/src/test/resources/rpc-client-test.properties @@ -20,7 +20,7 @@ rpccagent.channels = ch1 rpccagent.sources.src1.type = avro rpccagent.sources.src1.bind = 127.0.0.1 -rpccagent.sources.src1.port = 12121 +#rpccagent.sources.src1.port = Set from java source rpccagent.sources.src1.channels = ch1 rpccagent.channels.ch1.type = memory
