This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3ae55bf399e9ef36e6f6f1cb14ef5cc4b0ebbb8c
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Fri Mar 5 22:12:27 2021 +0100

    Fixed flaky hdfs itest.
---
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 19 +++++++++++-----
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  | 26 ++++++++++++++++++++--
 tests/itests-netty-http/pom.xml                    |  5 +++++
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index 993933e..479985c 100644
--- 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,15 +59,23 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
 
 
     @BeforeEach
-    public void setUp() throws IOException, URISyntaxException {
+    public void setUp() throws IOException, URISyntaxException, 
InitializationError {
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
         hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), 
hdfsService.getPort());
 
         String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
         currentBasePath = new Path(currentPath);
 
-        if (!hdfsEasy.delete(currentBasePath)) {
-            // This is OK: directory may not exist on the path
-            LOG.debug("The directory at {} was not removed {}", 
currentBasePath.getName());
+        boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> 
hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test")
+                                                                        &&  
hdfsEasy.delete(new Path(currentBasePath, "initTest")));
+
+        if (hdfsServiceCorrectlyStarted) {
+            if (!hdfsEasy.delete(currentBasePath)) {
+                // This is OK: directory may not exist on the path
+                LOG.debug("The directory at {} was not removed", 
currentBasePath.getName());
+            }
+        } else {
+            throw new InitializationError("HDFS Service didn't start 
properly.");
         }
     }
 
@@ -132,7 +141,7 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
 
             LOG.debug("Retrieved file {} with contents: {}", f.getPath(), 
contents);
             boolean contains = contents.contains(matchString);
-            assertTrue(contains, "Unexpected content for the remote file " + 
f.getPath().getName());
+            assertTrue(contains, "Unexpected content for the remote file " + 
f.getPath().getName() + " content: [" + contents + "] should contain [" + 
matchString + "]");
         } catch (IOException e) {
             LOG.debug("Reading returned file {} failed: {}", f.getPath(), 
e.getMessage());
             fail("I/O error: " + e.getMessage());
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
index 7733fe8..4e95191 100644
--- 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
@@ -26,6 +26,7 @@ import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -110,8 +111,7 @@ public class HDFSEasy {
         try {
             return countFiles(path) >= minFiles;
         } catch (Exception e) {
-            LOG.warn("I/O exception while checking if file {} exists", 
path.getName());
-
+            LOG.warn("I/O exception: {} due to {} while checking if file {} 
exists", e.getMessage(), e.getCause(), path.getName());
             return false;
         }
     }
@@ -133,4 +133,26 @@ public class HDFSEasy {
             return false;
         }
     }
+
+    public boolean createFile(Path filePath, String content) {
+        FSDataOutputStream streamWriter = null;
+        try {
+            streamWriter = dfs.create(filePath);
+            streamWriter.writeBytes(content);
+            streamWriter.flush();
+        } catch (IOException e) {
+            LOG.debug("Error in file creation: " + e.getMessage());
+            return false;
+        } finally {
+            if (streamWriter != null) {
+                try {
+                    streamWriter.close();
+                } catch (IOException e) {
+                    LOG.debug("Error in file creation during stream close: " + 
e.getMessage());
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index ddeb768..8fdaf9b 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -61,5 +61,10 @@
             <artifactId>mockwebserver</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

Reply via email to