This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-kafka-connector-0.7.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 3ae55bf399e9ef36e6f6f1cb14ef5cc4b0ebbb8c Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Fri Mar 5 22:12:27 2021 +0100 Fixed flaky hdfs itest. --- .../hdfs/sink/CamelSinkHDFSITCase.java | 19 +++++++++++----- .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java | 26 ++++++++++++++++++++-- tests/itests-netty-http/pom.xml | 5 +++++ 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java index 993933e..479985c 100644 --- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java +++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.runners.model.InitializationError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,15 +59,23 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest { @BeforeEach - public void setUp() throws IOException, URISyntaxException { + public void setUp() throws IOException, URISyntaxException, InitializationError { + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort()); String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/"; currentBasePath = new Path(currentPath); - if (!hdfsEasy.delete(currentBasePath)) { - // This is OK: directory may not exist on the path - LOG.debug("The directory at {} was not removed {}", currentBasePath.getName()); + boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test") + && hdfsEasy.delete(new Path(currentBasePath, "initTest"))); + + if (hdfsServiceCorrectlyStarted) { + if (!hdfsEasy.delete(currentBasePath)) { + // This is OK: directory may not exist on the path + LOG.debug("The directory at {} was not removed", currentBasePath.getName()); + } + } else { + throw new InitializationError("HDFS Service didn't start properly."); } } @@ -132,7 +141,7 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest { LOG.debug("Retrieved file {} with contents: {}", f.getPath(), contents); boolean contains = contents.contains(matchString); - assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName()); + assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName() + " content: [" + contents + "] should contain [" + matchString + "]"); } catch (IOException e) { LOG.debug("Reading returned file {} failed: {}", f.getPath(), e.getMessage()); fail("I/O error: " + e.getMessage()); diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java index 7733fe8..4e95191 100644 --- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java +++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java @@ -26,6 +26,7 @@ import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -110,8 +111,7 @@ public class HDFSEasy { try { return countFiles(path) >= minFiles; } catch (Exception e) { - LOG.warn("I/O exception while checking if file {} exists", path.getName()); - + LOG.warn("I/O exception: {} due to {} while checking if file {} exists", e.getMessage(), e.getCause(), path.getName()); return false; } } @@ -133,4 +133,26 @@ public class HDFSEasy { return false; } } + + public boolean createFile(Path filePath, String content) { + FSDataOutputStream streamWriter = null; + try { + streamWriter = dfs.create(filePath); + streamWriter.writeBytes(content); + streamWriter.flush(); + } catch (IOException e) { + LOG.debug("Error in file creation: " + e.getMessage()); + return false; + } finally { + if (streamWriter != null) { + try { + streamWriter.close(); + } catch (IOException e) { + LOG.debug("Error in file creation during stream close: " + e.getMessage()); + return false; + } + } + } + return true; + } } diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml index ddeb768..8fdaf9b 100644 --- a/tests/itests-netty-http/pom.xml +++ b/tests/itests-netty-http/pom.xml @@ -61,5 +61,10 @@ <artifactId>mockwebserver</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project>