This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ffe26a4  Added tests for Camel HDFS Sink
     new 8e3a13c  Merge pull request #277 from orpiske/hdfs-test
ffe26a4 is described below

commit ffe26a4ec7a3cc55a6d640eced755d38efd37b85
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Jun 3 17:41:51 2020 +0200

    Added tests for Camel HDFS Sink
---
 .../cassandra/sink/CamelSinkCassandraITCase.java   |   4 +-
 .../kafkaconnector/common/utils/TestUtils.java     |  13 +-
 tests/itests-hdfs/pom.xml                          |  61 +++++++++
 .../hdfs/services/ContainerLocalHDFSService.java   |  65 +++++++++
 .../hdfs/services/DataNodeContainer.java           |  98 ++++++++++++++
 .../kafkaconnector/hdfs/services/HDFSPorts.java    |  31 +++++
 .../kafkaconnector/hdfs/services/HDFSService.java  |  58 ++++++++
 .../hdfs/services/HDFSServiceFactory.java          |  46 +++++++
 .../hdfs/services/HadoopBaseContainer.java         |  48 +++++++
 .../hdfs/services/NameNodeContainer.java           |  50 +++++++
 .../hdfs/services/RemoteHDFSService.java           |  47 +++++++
 .../hdfs/sink/CamelHDFSPropertyFactory.java        |  71 ++++++++++
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 146 +++++++++++++++++++++
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  | 136 +++++++++++++++++++
 tests/itests-hdfs/src/test/resources/hdfs-site.xml |  28 ++++
 .../camel/kafkaconnector/hdfs/services/Dockerfile  |  57 ++++++++
 .../kafkaconnector/hdfs/services/core-site.xml     |  27 ++++
 .../kafkaconnector/hdfs/services/hdfs-site.xml     |  48 +++++++
 .../kafkaconnector/hdfs/services/run-datanode.sh   |  19 +++
 .../kafkaconnector/hdfs/services/run-namenode.sh   |  23 ++++
 tests/pom.xml                                      |   1 +
 21 files changed, 1071 insertions(+), 6 deletions(-)

diff --git 
a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
 
b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index cd6305c..dfe691e 100644
--- 
a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ 
b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -119,7 +119,9 @@ public class CamelSinkCassandraITCase extends 
AbstractKafkaTest {
             fail("Timed out wait for data to be added to the Kafka cluster");
         }
 
-        TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect);
+        if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
+            fail("Did not receive enough data");
+        }
         testDataDao.getData(this::checkRetrievedData);
         assertTrue(received >= expect,
                 String.format("Did not receive as much data as expected: %d < 
%d", received, expect));
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
index 202ad82..6e221bb 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
@@ -50,8 +50,8 @@ public final class TestUtils {
      * @param payload
      * @param <T>
      */
-    public static <T> void waitFor(Predicate<T> resourceCheck, T payload) {
-        boolean state;
+    public static <T> boolean waitFor(Predicate<T> resourceCheck, T payload) {
+        boolean state = false;
         int retries = 30;
         int waitTime = 1000;
         do {
@@ -69,6 +69,8 @@ public final class TestUtils {
             }
 
         } while (!state && retries > 0);
+
+        return state;
     }
 
 
@@ -76,8 +78,8 @@ public final class TestUtils {
      * Wait for a given condition to be true or the retry amount (30) to expire
      * @param resourceCheck
      */
-    public static void waitFor(BooleanSupplier resourceCheck) {
-        boolean state;
+    public static boolean waitFor(BooleanSupplier resourceCheck) {
+        boolean state = false;
         int retries = 30;
         int waitTime = 1000;
         do {
@@ -93,8 +95,9 @@ public final class TestUtils {
             } catch (InterruptedException e) {
                 break;
             }
-
         } while (!state && retries > 0);
+
+        return state;
     }
 
     /**
diff --git a/tests/itests-hdfs/pom.xml b/tests/itests-hdfs/pom.xml
new file mode 100644
index 0000000..c7de9fd
--- /dev/null
+++ b/tests/itests-hdfs/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-hdfs</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: HDFS</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-hdfs</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+    </dependencies>
+
+
+
+</project>
\ No newline at end of file
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/ContainerLocalHDFSService.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/ContainerLocalHDFSService.java
new file mode 100644
index 0000000..ca0947a
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/ContainerLocalHDFSService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ContainerLocalHDFSService implements HDFSService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ContainerLocalHDFSService.class);
+    private final NameNodeContainer nameNodeContainer;
+    private final DataNodeContainer dataNodeContainer;
+
+    public ContainerLocalHDFSService() {
+        Network network = Network.newNetwork();
+
+        nameNodeContainer = new NameNodeContainer(network);
+        dataNodeContainer = new DataNodeContainer(network);
+    }
+
+    @Override
+    public String getHDFSHost() {
+        return nameNodeContainer.getContainerIpAddress();
+    }
+
+    @Override
+    public int getPort() {
+        return nameNodeContainer.getIpcPort();
+    }
+
+    @Override
+    public void initialize() {
+        nameNodeContainer.start();
+
+        String hdfsNameNodeWeb = nameNodeContainer.getContainerIpAddress() + 
":" + nameNodeContainer.getHttpPort();
+        LOG.info("HDFS Name node web UI running at address http://{}";, 
hdfsNameNodeWeb);
+
+        dataNodeContainer.start();
+
+        String hdfsDataNodeWeb = dataNodeContainer.getContainerIpAddress() + 
":" + dataNodeContainer.getHttpPort();
+        LOG.info("HDFS Data node web UI running at address http://{}";, 
hdfsDataNodeWeb);
+        LOG.info("HDFS Data node running at address {}:{}", getHDFSHost(), 
getPort());
+    }
+
+    @Override
+    public void shutdown() {
+        dataNodeContainer.stop();
+        nameNodeContainer.stop();
+    }
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/DataNodeContainer.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/DataNodeContainer.java
new file mode 100644
index 0000000..5cd51e7
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/DataNodeContainer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.dockerclient.DockerClientConfigUtils;
+
+public class DataNodeContainer extends HadoopBaseContainer<DataNodeContainer> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataNodeContainer.class);
+    private static String dataNodeHost = "localhost";
+
+    static {
+        String dockerHost = System.getenv("DOCKER_HOST");
+
+        if (dockerHost != null && !dockerHost.isEmpty()) {
+            try {
+                URI dockerHostUri = new URI(dockerHost);
+                dataNodeHost = 
DockerClientConfigUtils.getDockerHostIpAddress(dockerHostUri);
+
+            } catch (URISyntaxException e) {
+                LOG.warn("Using 'localhost' as the docker host because the URI 
'{}' for did not parse correctly: {}",
+                        dockerHost, e.getMessage(), e);
+            }
+        }
+    }
+
+    public DataNodeContainer(Network network) {
+        this(network, dataNodeHost);
+    }
+
+
+
+    public DataNodeContainer(Network network, String name) {
+        super(network, name);
+
+        withCommand("sh", "-c", "/hadoop/run-datanode.sh");
+
+        withExposedPorts(HDFSPorts.DATA_NODE_HTTP_PORT, 
HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT, HDFSPorts.DATA_NODE_IPC_PORT);
+
+        waitingFor(Wait.forHttp("/").forPort(HDFSPorts.DATA_NODE_HTTP_PORT));
+
+
+
+        /*
+         We need the name to be a valid hostname: the files are uploaded
+         directly to the dataNode host using the *hostname*. By default, the 
hostname
+         is not valid and no accessible from outside, therefore we trick the 
container
+         into using the localhost name so when the data node is resolved, it 
actually
+         points to the port on the local host that is redirected inside the 
container.
+         */
+        withCreateContainerCmdModifier(
+            createContainerCmd -> {
+                createContainerCmd.withHostName(name);
+                createContainerCmd.withName(name);
+            }
+        );
+
+        addFixedExposedPort(HDFSPorts.DATA_NODE_HTTP_PORT, 
HDFSPorts.DATA_NODE_HTTP_PORT);
+        addFixedExposedPort(HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT, 
HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT);
+        addFixedExposedPort(HDFSPorts.DATA_NODE_IPC_PORT, 
HDFSPorts.DATA_NODE_IPC_PORT);
+    }
+
+    public int getHttpPort() {
+        return getMappedPort(HDFSPorts.DATA_NODE_HTTP_PORT);
+    }
+
+    public int getDataTransferPort() {
+        return HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT;
+    }
+
+    public int getIpcPort() {
+        return HDFSPorts.DATA_NODE_IPC_PORT;
+    }
+
+
+
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSPorts.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSPorts.java
new file mode 100644
index 0000000..190aef0
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSPorts.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+final class HDFSPorts {
+    public static final int NAME_NODE_HTTP_PORT = 50070;
+    public static final int NAME_NODE_IPC_PORT = 8020;
+
+    public static final int DATA_NODE_HTTP_PORT = 50075;
+    public static final int DATA_NODE_DATA_TRANSFER_PORT = 50010;
+    public static final int DATA_NODE_IPC_PORT = 50020;
+
+    private HDFSPorts() {
+    }
+
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSService.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSService.java
new file mode 100644
index 0000000..0b78d98
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface HDFSService extends BeforeAllCallback, AfterAllCallback {
+
+    /**
+     * Gets the hostname of the HDFS server
+     * @return
+     */
+    String getHDFSHost();
+
+    /**
+     * Gets the port used by the HDFS server
+     * @return
+     */
+    int getPort();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception 
{
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java
new file mode 100644
index 0000000..17e0319
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HDFSServiceFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HDFSServiceFactory.class);
+
+    private HDFSServiceFactory() {
+
+    }
+
+    public static HDFSService createService() {
+        String instanceType = System.getProperty("hdfs.instance.type");
+
+        if (instanceType == null || 
instanceType.equals("local-hdfs-container")) {
+            return new ContainerLocalHDFSService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new RemoteHDFSService();
+        }
+
+        LOG.error("Invalid HDFS instance type: {}. Must be either 'remote' or 
'local-hdfs-container",
+                instanceType);
+        throw new UnsupportedOperationException(String.format("Invalid HDFS 
instance type: %s", instanceType));
+
+    }
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HadoopBaseContainer.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HadoopBaseContainer.java
new file mode 100644
index 0000000..e01e478
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HadoopBaseContainer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import java.util.function.Consumer;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+abstract class HadoopBaseContainer<T extends GenericContainer<T>> extends 
GenericContainer<T> {
+
+    public HadoopBaseContainer(Network network, String name) {
+        super(new ImageFromDockerfile("hadoop-2x:ckc", false)
+                .withFileFromClasspath(".",
+                        "org/apache/camel/kafkaconnector/hdfs/services/"));
+
+        withNetwork(network);
+
+        withCreateContainerCmdModifier(
+                new Consumer<CreateContainerCmd>() {
+                    @Override
+                    public void accept(CreateContainerCmd createContainerCmd) {
+                        createContainerCmd.withHostName(name);
+                        createContainerCmd.withName(name);
+                    }
+                }
+        );
+    }
+
+    abstract int getHttpPort();
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/NameNodeContainer.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/NameNodeContainer.java
new file mode 100644
index 0000000..9b12e43
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/NameNodeContainer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class NameNodeContainer extends HadoopBaseContainer<NameNodeContainer> {
+
+
+
+    public NameNodeContainer(Network network) {
+        this(network, "namenode");
+    }
+
+    public NameNodeContainer(Network network, String name) {
+        super(network, name);
+
+        withCommand("sh", "-c", "/hadoop/run-namenode.sh");
+        withExposedPorts(HDFSPorts.NAME_NODE_HTTP_PORT, 
HDFSPorts.NAME_NODE_IPC_PORT);
+
+        waitingFor(Wait.forHttp("/").forPort(HDFSPorts.NAME_NODE_HTTP_PORT));
+
+        addFixedExposedPort(HDFSPorts.NAME_NODE_HTTP_PORT, 
HDFSPorts.NAME_NODE_HTTP_PORT);
+        addFixedExposedPort(HDFSPorts.NAME_NODE_IPC_PORT, 
HDFSPorts.NAME_NODE_IPC_PORT);
+    }
+
+    public int getHttpPort() {
+        return getMappedPort(HDFSPorts.NAME_NODE_HTTP_PORT);
+    }
+
+    public int getIpcPort() {
+        return HDFSPorts.NAME_NODE_IPC_PORT;
+    }
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java
new file mode 100644
index 0000000..6717147
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+public class RemoteHDFSService implements HDFSService {
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public String getHDFSHost() {
+        return System.getProperty("hdfs.host");
+    }
+
+    @Override
+    public int getPort() {
+        String strPort = System.getProperty("hdfs.port");
+
+        if (strPort != null) {
+            return Integer.parseInt(strPort);
+        }
+
+        return 8020;
+    }
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelHDFSPropertyFactory.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelHDFSPropertyFactory.java
new file mode 100644
index 0000000..7437a67
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelHDFSPropertyFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.sink;
+
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelHDFSPropertyFactory extends 
SinkConnectorPropertyFactory<CamelHDFSPropertyFactory> {
+    private CamelHDFSPropertyFactory() {
+
+    }
+
+    public CamelHDFSPropertyFactory withHostname(String value) {
+        return setProperty("camel.sink.path.hostName", value);
+    }
+
+    public CamelHDFSPropertyFactory withPort(int value) {
+        return setProperty("camel.sink.path.port", value);
+    }
+
+    public CamelHDFSPropertyFactory withPath(String value) {
+        return setProperty("camel.sink.path.path", value);
+    }
+
+    public CamelHDFSPropertyFactory withSplitStrategy(String value) {
+        return setProperty("camel.sink.endpoint.splitStrategy", value);
+    }
+
+    public CamelHDFSPropertyFactory withReplication(int value) {
+        return setProperty("camel.sink.endpoint.replication", value);
+    }
+
+    public CamelHDFSPropertyFactory withOwner(String value) {
+        return setProperty("camel.sink.endpoint.owner", value);
+    }
+
+    public CamelHDFSPropertyFactory withAppend(boolean value) {
+        return setProperty("camel.sink.endpoint.append", value);
+    }
+
+    public CamelHDFSPropertyFactory withBufferSize(int value) {
+        return setProperty("camel.sink.endpoint.bufferSize", value);
+    }
+
+
+    public static CamelHDFSPropertyFactory basic() {
+        return new CamelHDFSPropertyFactory()
+                .withName("CamelHDFSSinkConnector")
+                .withTasksMax(1)
+                
.withConnectorClass("org.apache.camel.kafkaconnector.hdfs.CamelHdfsSinkConnector")
+                
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withReplication(1);
+    }
+
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
new file mode 100644
index 0000000..d1efb07
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.sink;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.hdfs.services.HDFSService;
+import org.apache.camel.kafkaconnector.hdfs.services.HDFSServiceFactory;
+import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Testcontainers
+public class CamelSinkHDFSITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static HDFSService hdfsService = HDFSServiceFactory.createService();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkHDFSITCase.class);
+
+    private HDFSEasy hdfsEasy;
+    private Path currentBasePath;
+
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-hdfs-kafka-connector"};
+    }
+
+
+    @BeforeEach
+    public void setUp() throws IOException, URISyntaxException {
+        hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), 
hdfsService.getPort());
+
+        String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
+        currentBasePath = new Path(currentPath);
+
+        if (!hdfsEasy.delete(currentBasePath)) {
+            // This is OK: directory may not exist on the path
+            LOG.debug("The directory at {} was not removed {}", 
currentBasePath.getName());
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (!hdfsEasy.delete(currentBasePath)) {
+            LOG.warn("The directory at {} was not removed {}", 
currentBasePath.getName());
+        }
+    }
+
+    private boolean filesCreated() {
+        return hdfsEasy.filesCreated(currentBasePath, expect);
+    }
+
+
+    private String sendKafkaMessages(String baseMessage, int count) throws 
java.util.concurrent.ExecutionException, InterruptedException {
+        LOG.info("Sending data to Kafka");
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < count; i++) {
+            
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), baseMessage 
+ i);
+        }
+        return baseMessage;
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = 
CamelHDFSPropertyFactory
+                    .basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withHostname(hdfsService.getHDFSHost())
+                    .withPort(hdfsService.getPort())
+                    .withPath(currentBasePath.getName())
+                    .withSplitStrategy("MESSAGES:1,IDLE:1000");
+
+            connectorPropertyFactory.log();
+            
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+
+            final String baseMessage = "Sink test message: ";
+            sendKafkaMessages(baseMessage, expect);
+
+            boolean filesCreated = TestUtils.waitFor(this::filesCreated);
+            assertTrue(filesCreated, "The files were not created on the remote 
host");
+            assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The 
number of files created vs expected do not match");
+            hdfsEasy.listFiles(currentBasePath)
+                    .stream()
+                    .filter(f -> !f.getPath().getName().contains(".opened"))
+                    .forEach(f -> printFile(f, baseMessage));
+
+        } catch (Exception e) {
+            LOG.error("HDFS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+
+
+    private void printFile(LocatedFileStatus f, String matchString) {
+        try {
+            String contents = hdfsEasy.readFile(f.getPath());
+
+            LOG.debug("Retrieved file {} with contents: {}", f.getPath(), 
contents);
+            boolean contains = contents.contains(matchString);
+            assertTrue(contains, "Unexpected content for the remote file " + 
f.getPath().getName());
+        } catch (IOException e) {
+            LOG.debug("Reading returned file {} failed: {}", f.getPath(), 
e.getMessage());
+            fail("I/O error: " + e.getMessage());
+        }
+    }
+}
diff --git 
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
new file mode 100644
index 0000000..7733fe8
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HDFSEasy {
+    private static final Logger LOG = LoggerFactory.getLogger(HDFSEasy.class);
+
+    private DistributedFileSystem dfs = new DistributedFileSystem();
+
+    public HDFSEasy(String host, int port) throws URISyntaxException, 
IOException {
+        dfs.initialize(new URI("hdfs://" + host + ":" + port), new 
Configuration());
+    }
+
+    public List<LocatedFileStatus> listFiles(Path path) throws IOException {
+        RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
+
+        List<LocatedFileStatus> retList = new ArrayList<>();
+        while (i.hasNext()) {
+            LocatedFileStatus locatedFileStatus = i.next();
+            retList.add(locatedFileStatus);
+        }
+
+        return retList;
+    }
+
+    public boolean delete(Path path) {
+        try {
+            if (dfs.exists(path)) {
+                LOG.debug("Removing HDFS directory {}", path.getName());
+                if (!dfs.delete(path, true)) {
+                    LOG.debug("Failed to remove directory {}", path.getName());
+
+                    return false;
+                }
+
+                return true;
+            }
+        } catch (IOException e) {
+            LOG.warn("Unable to remove HDFS directory {}: {}", path.getName(), 
e.getMessage(), e);
+        }
+
+        return false;
+    }
+
+    public String readFile(Path filePath) throws IOException {
+        final FSDataInputStream streamReader = dfs.open(filePath);
+        final Scanner scanner = new Scanner(streamReader);
+
+        StringBuilder sb = new StringBuilder();
+        while (scanner.hasNextLine()) {
+            sb.append(scanner.nextLine());
+        }
+
+        return sb.toString();
+    }
+
+    public String readFile(String filePath) throws IOException {
+        return readFile(new Path(filePath));
+    }
+
+    public int countFiles(Path path) throws IOException {
+        RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
+        int files = 0;
+        while (i.hasNext()) {
+            files++;
+            i.next();
+        }
+
+        return files;
+    }
+
+    /**
+     * Checks if a set of (minimum number of) files was created on the given 
path representing a directory
+     * @param path the path to check for the files
+     * @param minFiles the number of files created (using 0 just checks if the 
directory is there)
+     * @return true if the path contains at least minFiles and false otherwise
+     */
+    public boolean filesCreated(Path path, int minFiles) {
+        try {
+            return countFiles(path) >= minFiles;
+        } catch (Exception e) {
+            LOG.warn("I/O exception while checking if file {} exists", 
path.getName());
+
+            return false;
+        }
+    }
+
+    public boolean filesCreated(Path path) {
+        return filesCreated(path, 0);
+    }
+
+    public boolean filesCreated(String path) {
+        return filesCreated(new Path(path));
+    }
+
+    public boolean exists(Path path) {
+        try {
+            return dfs.exists(path);
+        } catch (Exception e) {
+            LOG.warn("I/O exception while checking if file {} exists", 
path.getName());
+
+            return false;
+        }
+    }
+}
diff --git a/tests/itests-hdfs/src/test/resources/hdfs-site.xml 
b/tests/itests-hdfs/src/test/resources/hdfs-site.xml
new file mode 100644
index 0000000..c68b177
--- /dev/null
+++ b/tests/itests-hdfs/src/test/resources/hdfs-site.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!-- This one is used by the clients (both the Camel one as well as the 
integration test one) -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+        <name>dfs.client.use.datanode.hostname</name>
+        <value>true</value>
+    </property>
+</configuration>
diff --git 
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/Dockerfile
 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/Dockerfile
new file mode 100644
index 0000000..1bba317
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/Dockerfile
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+FROM fedora:31 as builder
+ARG HADOOP_VERSION
+ENV HADOOP_VERSION ${HADOOP_VERSION:-2.10.0}
+RUN curl 
https://archive.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
 -o hadoop.tar.gz
+RUN mkdir -p hadoop && tar --strip-components=1 -xvf hadoop.tar.gz -C /hadoop 
&& rm -f hadoop.tar.gz
+
+FROM fedora:31
+MAINTAINER Otavio Rodolfo Piske <[email protected]>
+ARG HADOOP_VERSION
+ENV HADOOP_VERSION ${HADOOP_VERSION:-2.10.0}
+EXPOSE 8020 9000 50010 50020 50070 50075
+RUN dnf install -y java-1.8.0-openjdk-headless tar gzip rsync which procps-ng
+ENV JAVA_HOME /etc/alternatives/jre
+COPY --from=builder /hadoop /hadoop
+ADD core-site.xml /hadoop/etc/hadoop/core-site.xml
+ADD hdfs-site.xml /hadoop/etc/hadoop/hdfs-site.xml
+ADD run-datanode.sh /hadoop
+ADD run-namenode.sh /hadoop
+RUN chmod +x /hadoop/*.sh
+ENV HADOOP_HOME /hadoop
+ENV HADOOP_LOG_DIR=${HADOOP_HOME}/logs
+# Uncomment this line for enabling debug log
+# ENV HADOOP_ROOT_LOGGER=DEBUG,RFA,console
+VOLUME /hdfs/
+WORKDIR /hadoop
+
+# To run this image manually, follow these steps:
+
+# Create a network:
+# docker network create hadoop-tmp
+
+# Namenode starts with (format + runtime):
+# bin/hdfs --config /hadoop/etc/hadoop/  namenode -format && bin/hdfs --config 
/hadoop/etc/hadoop/  namenode
+
+# All in one shot from docker:
+# docker run --rm -it -p 8020:8020 -p 9000:9000 -p 50070:50070 --network 
hadoop-tmp --name namenode hadoop /hadoop/run-namenode.sh
+
+# Datanode starts with:
+# docker run -p 50075:50075 -p 50010:50010 -p 50020:50020 --rm -it --network 
hadoop-tmp --name datanode-1 hadoop /hadoop/run-datanode.sh datanode-1
+
+# If you need more nodes, just increase the number on the datanode name (ie.: 
datanode-2, datanode-3, etc)s
+# docker run --rm -it --network hadoop-tmp --name datanode-2 hadoop 
/hadoop/run-datanode.sh
diff --git 
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/core-site.xml
 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/core-site.xml
new file mode 100644
index 0000000..c817310
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/core-site.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://namenode:8020</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git 
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/hdfs-site.xml
 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/hdfs-site.xml
new file mode 100644
index 0000000..6a09c85
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/hdfs-site.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!-- This one is used by the HDFS instance on the containers -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+    </property>
+    <property>
+        <name>dfs.namenode.name.dir</name>
+        <value>file:///hdfs/namenode</value>
+    </property>
+    <property>
+        <name>dfs.datanode.data.dir</name>
+        <value>file:///hdfs/datanode</value>
+    </property>
+    <property>
+        <name>dfs.permissions.enabled</name>
+        <value>false</value>
+    </property>
+    <property>
+        <name>dfs.client.use.datanode.hostname</name>
+        <value>true</value>
+    </property>
+    <property>
+        <name>dfs.datanode.use.datanode.hostname</name>
+        <value>true</value>
+    </property>
+</configuration>
diff --git 
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-datanode.sh
 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-datanode.sh
new file mode 100755
index 0000000..d1cfc3d
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-datanode.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+cd "${HDP_HOME}"
+
+bin/hdfs --config /hadoop/etc/hadoop/ datanode
\ No newline at end of file
diff --git 
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-namenode.sh
 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-namenode.sh
new file mode 100755
index 0000000..269d7c1
--- /dev/null
+++ 
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-namenode.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+cd "${HDP_HOME}"
+
+echo "Formatting the HDFS"
+bin/hdfs --config /hadoop/etc/hadoop/ namenode -format
+
+echo "Running Hadoop's namenode"
+bin/hdfs --config /hadoop/etc/hadoop/ namenode
\ No newline at end of file
diff --git a/tests/pom.xml b/tests/pom.xml
index 94142f9..943cec2 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -51,6 +51,7 @@
         <module>itests-timer</module>
         <module>itests-slack</module>
         <module>itests-salesforce</module>
+        <module>itests-hdfs</module>
     </modules>
 
 

Reply via email to