This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit dff4d98b47cfc4b75038bb2c6422616fe39cacc9 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Fri Mar 5 22:12:27 2021 +0100 Fixed flaky hdfs itest. --- .../hdfs/sink/CamelSinkHDFSITCase.java | 28 +++-- .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java | 26 ++++- tests/itests-netty-http/pom.xml | 61 ++++++++++ .../surce/CamelNettyHTTPPropertyFactory.java | 61 ++++++++++ .../surce/CamelSourceNettyHTTPITCase.java | 123 +++++++++++++++++++++ 5 files changed, 287 insertions(+), 12 deletions(-) diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java index f12f310..55cf21f 100644 --- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java +++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java @@ -17,11 +17,6 @@ package org.apache.camel.kafkaconnector.hdfs.sink; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; @@ -37,9 +32,15 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.runners.model.InitializationError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -74,16 +75,23 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport { } @BeforeEach - public void setUp() throws IOException, URISyntaxException { + public void setUp() throws IOException, URISyntaxException, InitializationError { topicName = getTopicForTest(this); hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort()); String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/"; currentBasePath = new Path(currentPath); - if (!hdfsEasy.delete(currentBasePath)) { - // This is OK: directory may not exist on the path - LOG.debug("The directory at {} was not removed", currentBasePath.getName()); + boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test") + && hdfsEasy.delete(new Path(currentBasePath, "initTest"))); + + if(hdfsServiceCorrectlyStarted) { + if (!hdfsEasy.delete(currentBasePath)) { + // This is OK: directory may not exist on the path + LOG.debug("The directory at {} was not removed", currentBasePath.getName()); + } + } else { + throw new InitializationError("HDFS Service didn't start properly."); } } @@ -136,7 +144,7 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport { LOG.debug("Retrieved file {} with contents: {}", f.getPath(), contents); boolean contains = contents.contains(matchString); - assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName()); + assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName() + " content: [" + contents + "] should contain [" + matchString + "]"); } catch (IOException e) { LOG.debug("Reading returned file {} failed: {}", f.getPath(), e.getMessage()); fail("I/O error: " + e.getMessage()); diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java index 7733fe8..4e95191 100644 --- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java +++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java @@ -26,6 +26,7 @@ import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -110,8 +111,7 @@ public class HDFSEasy { try { return countFiles(path) >= minFiles; } catch (Exception e) { - LOG.warn("I/O exception while checking if file {} exists", path.getName()); - + LOG.warn("I/O exception: {} due to {} while checking if file {} exists", e.getMessage(), e.getCause(), path.getName()); return false; } } @@ -133,4 +133,26 @@ public class HDFSEasy { return false; } } + + public boolean createFile(Path filePath, String content) { + FSDataOutputStream streamWriter = null; + try { + streamWriter = dfs.create(filePath); + streamWriter.writeBytes(content); + streamWriter.flush(); + } catch (IOException e) { + LOG.debug("Error in file creation: " + e.getMessage()); + return false; + } finally { + if (streamWriter != null) { + try { + streamWriter.close(); + } catch (IOException e) { + LOG.debug("Error in file creation during stream close: " + e.getMessage()); + return false; + } + } + } + return true; + } } diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml new file mode 100644 index 0000000..ddb8bf7 --- /dev/null +++ b/tests/itests-netty-http/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-netty-http</artifactId> + <name>Camel-Kafka-Connector :: Tests :: Netty HTTP</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-netty-http</artifactId> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>mockwebserver</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java new file mode 100644 index 0000000..1562328 --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.nettyhttp.surce; + +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + +final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> { + private CamelNettyHTTPPropertyFactory() { + + } + + public CamelNettyHTTPPropertyFactory withHost(String host) { + return setProperty("camel.source.path.host", host); + } + + public CamelNettyHTTPPropertyFactory withProtocol(String protocol) { + return setProperty("camel.source.path.protocol", protocol); + } + + public CamelNettyHTTPPropertyFactory withPort(int port) { + return setProperty("camel.source.path.port", String.valueOf(port)); + } + + public CamelNettyHTTPPropertyFactory withSync(boolean sync) { + return setProperty("camel.source.endpoint.sync", String.valueOf(sync)); + } + + public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) { + return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size)); + } + + public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) { + setProperty("transforms", "cameltypeconverter"); + setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value"); + return setProperty("transforms.cameltypeconverter.target.type", targetClass); + } + + public static CamelNettyHTTPPropertyFactory basic() { + return new CamelNettyHTTPPropertyFactory() + .withTasksMax(1) + .withName("CamelNettyHttpSourceConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java new file mode 100644 index 0000000..e7e6468 --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.nettyhttp.surce; + +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class); + private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost"); + private static final String TEST_MESSAGE = "testMessage"; + + private String topicName; + + private final int expect = 1; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-netty-http-kafka-connector"}; + } + + @BeforeEach + public void setUp() throws IOException { + topicName = getTopicForTest(this); + } + + @AfterEach + public void tearDown() {} + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic() + .withKafkaTopic(topicName) + .withReceiveBufferSize(10) + .withHost("0.0.0.0") + .withPort(HTTP_PORT) + .withProtocol("http") + .withCamelTypeConverterTransformTo("java.lang.String"); + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } + + @Override + protected void produceTestData() { + int retriesLeft = 10; + boolean success = false; + while(retriesLeft > 0 && !success) { + try (final CloseableHttpClient httpclient = HttpClients.createDefault()) { + + byte[] ipAddr = new byte[]{127, 0, 0, 1}; + InetAddress localhost = InetAddress.getByAddress(ipAddr); + final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT); + + LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI()); + + httpPost.setEntity(new StringEntity(TEST_MESSAGE)); + + CloseableHttpResponse response = httpclient.execute(httpPost); + assertEquals(200, response.getStatusLine().getStatusCode()); + response.close(); + httpPost.releaseConnection(); + success = true; + LOG.info("Request success at {} attempt.", retriesLeft); + } catch (IOException e) { + if(retriesLeft == 1) { + e.printStackTrace(); + fail("There should be no exceptions in sending the http test message."); + } else { + retriesLeft--; + try { + Thread.sleep(100); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } + } + } + } + + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); + assertEquals(expect, received, "Didn't process the expected amount of messages"); + assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString()); + } +}