This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ffe26a4 Added tests for Camel HDFS Sink
new 8e3a13c Merge pull request #277 from orpiske/hdfs-test
ffe26a4 is described below
commit ffe26a4ec7a3cc55a6d640eced755d38efd37b85
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Jun 3 17:41:51 2020 +0200
Added tests for Camel HDFS Sink
---
.../cassandra/sink/CamelSinkCassandraITCase.java | 4 +-
.../kafkaconnector/common/utils/TestUtils.java | 13 +-
tests/itests-hdfs/pom.xml | 61 +++++++++
.../hdfs/services/ContainerLocalHDFSService.java | 65 +++++++++
.../hdfs/services/DataNodeContainer.java | 98 ++++++++++++++
.../kafkaconnector/hdfs/services/HDFSPorts.java | 31 +++++
.../kafkaconnector/hdfs/services/HDFSService.java | 58 ++++++++
.../hdfs/services/HDFSServiceFactory.java | 46 +++++++
.../hdfs/services/HadoopBaseContainer.java | 48 +++++++
.../hdfs/services/NameNodeContainer.java | 50 +++++++
.../hdfs/services/RemoteHDFSService.java | 47 +++++++
.../hdfs/sink/CamelHDFSPropertyFactory.java | 71 ++++++++++
.../hdfs/sink/CamelSinkHDFSITCase.java | 146 +++++++++++++++++++++
.../camel/kafkaconnector/hdfs/utils/HDFSEasy.java | 136 +++++++++++++++++++
tests/itests-hdfs/src/test/resources/hdfs-site.xml | 28 ++++
.../camel/kafkaconnector/hdfs/services/Dockerfile | 57 ++++++++
.../kafkaconnector/hdfs/services/core-site.xml | 27 ++++
.../kafkaconnector/hdfs/services/hdfs-site.xml | 48 +++++++
.../kafkaconnector/hdfs/services/run-datanode.sh | 19 +++
.../kafkaconnector/hdfs/services/run-namenode.sh | 23 ++++
tests/pom.xml | 1 +
21 files changed, 1071 insertions(+), 6 deletions(-)
diff --git
a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index cd6305c..dfe691e 100644
---
a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++
b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -119,7 +119,9 @@ public class CamelSinkCassandraITCase extends
AbstractKafkaTest {
fail("Timed out wait for data to be added to the Kafka cluster");
}
- TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect);
+ if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
+ fail("Did not receive enough data");
+ }
testDataDao.getData(this::checkRetrievedData);
assertTrue(received >= expect,
String.format("Did not receive as much data as expected: %d <
%d", received, expect));
diff --git
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
index 202ad82..6e221bb 100644
---
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
+++
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/TestUtils.java
@@ -50,8 +50,8 @@ public final class TestUtils {
* @param payload
* @param <T>
*/
- public static <T> void waitFor(Predicate<T> resourceCheck, T payload) {
- boolean state;
+ public static <T> boolean waitFor(Predicate<T> resourceCheck, T payload) {
+ boolean state = false;
int retries = 30;
int waitTime = 1000;
do {
@@ -69,6 +69,8 @@ public final class TestUtils {
}
} while (!state && retries > 0);
+
+ return state;
}
@@ -76,8 +78,8 @@ public final class TestUtils {
* Wait for a given condition to be true or the retry amount (30) to expire
* @param resourceCheck
*/
- public static void waitFor(BooleanSupplier resourceCheck) {
- boolean state;
+ public static boolean waitFor(BooleanSupplier resourceCheck) {
+ boolean state = false;
int retries = 30;
int waitTime = 1000;
do {
@@ -93,8 +95,9 @@ public final class TestUtils {
} catch (InterruptedException e) {
break;
}
-
} while (!state && retries > 0);
+
+ return state;
}
/**
diff --git a/tests/itests-hdfs/pom.xml b/tests/itests-hdfs/pom.xml
new file mode 100644
index 0000000..c7de9fd
--- /dev/null
+++ b/tests/itests-hdfs/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-hdfs</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: HDFS</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-hdfs</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+
+
+</project>
\ No newline at end of file
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/ContainerLocalHDFSService.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/ContainerLocalHDFSService.java
new file mode 100644
index 0000000..ca0947a
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/ContainerLocalHDFSService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ContainerLocalHDFSService implements HDFSService {
+ private static final Logger LOG =
LoggerFactory.getLogger(ContainerLocalHDFSService.class);
+ private final NameNodeContainer nameNodeContainer;
+ private final DataNodeContainer dataNodeContainer;
+
+ public ContainerLocalHDFSService() {
+ Network network = Network.newNetwork();
+
+ nameNodeContainer = new NameNodeContainer(network);
+ dataNodeContainer = new DataNodeContainer(network);
+ }
+
+ @Override
+ public String getHDFSHost() {
+ return nameNodeContainer.getContainerIpAddress();
+ }
+
+ @Override
+ public int getPort() {
+ return nameNodeContainer.getIpcPort();
+ }
+
+ @Override
+ public void initialize() {
+ nameNodeContainer.start();
+
+ String hdfsNameNodeWeb = nameNodeContainer.getContainerIpAddress() +
":" + nameNodeContainer.getHttpPort();
+ LOG.info("HDFS Name node web UI running at address http://{}",
hdfsNameNodeWeb);
+
+ dataNodeContainer.start();
+
+ String hdfsDataNodeWeb = dataNodeContainer.getContainerIpAddress() +
":" + dataNodeContainer.getHttpPort();
+ LOG.info("HDFS Data node web UI running at address http://{}",
hdfsDataNodeWeb);
+ LOG.info("HDFS Data node running at address {}:{}", getHDFSHost(),
getPort());
+ }
+
+ @Override
+ public void shutdown() {
+ dataNodeContainer.stop();
+ nameNodeContainer.stop();
+ }
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/DataNodeContainer.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/DataNodeContainer.java
new file mode 100644
index 0000000..5cd51e7
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/DataNodeContainer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.dockerclient.DockerClientConfigUtils;
+
+public class DataNodeContainer extends HadoopBaseContainer<DataNodeContainer> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataNodeContainer.class);
+ private static String dataNodeHost = "localhost";
+
+ static {
+ String dockerHost = System.getenv("DOCKER_HOST");
+
+ if (dockerHost != null && !dockerHost.isEmpty()) {
+ try {
+ URI dockerHostUri = new URI(dockerHost);
+ dataNodeHost =
DockerClientConfigUtils.getDockerHostIpAddress(dockerHostUri);
+
+ } catch (URISyntaxException e) {
+ LOG.warn("Using 'localhost' as the docker host because the URI
'{}' for did not parse correctly: {}",
+ dockerHost, e.getMessage(), e);
+ }
+ }
+ }
+
+ public DataNodeContainer(Network network) {
+ this(network, dataNodeHost);
+ }
+
+
+
+ public DataNodeContainer(Network network, String name) {
+ super(network, name);
+
+ withCommand("sh", "-c", "/hadoop/run-datanode.sh");
+
+ withExposedPorts(HDFSPorts.DATA_NODE_HTTP_PORT,
HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT, HDFSPorts.DATA_NODE_IPC_PORT);
+
+ waitingFor(Wait.forHttp("/").forPort(HDFSPorts.DATA_NODE_HTTP_PORT));
+
+
+
+ /*
+ We need the name to be a valid hostname: the files are uploaded
+ directly to the dataNode host using the *hostname*. By default, the
hostname
+ is not valid and no accessible from outside, therefore we trick the
container
+ into using the localhost name so when the data node is resolved, it
actually
+ points to the port on the local host that is redirected inside the
container.
+ */
+ withCreateContainerCmdModifier(
+ createContainerCmd -> {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
+ );
+
+ addFixedExposedPort(HDFSPorts.DATA_NODE_HTTP_PORT,
HDFSPorts.DATA_NODE_HTTP_PORT);
+ addFixedExposedPort(HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT,
HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT);
+ addFixedExposedPort(HDFSPorts.DATA_NODE_IPC_PORT,
HDFSPorts.DATA_NODE_IPC_PORT);
+ }
+
+ public int getHttpPort() {
+ return getMappedPort(HDFSPorts.DATA_NODE_HTTP_PORT);
+ }
+
+ public int getDataTransferPort() {
+ return HDFSPorts.DATA_NODE_DATA_TRANSFER_PORT;
+ }
+
+ public int getIpcPort() {
+ return HDFSPorts.DATA_NODE_IPC_PORT;
+ }
+
+
+
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSPorts.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSPorts.java
new file mode 100644
index 0000000..190aef0
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSPorts.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+final class HDFSPorts {
+ public static final int NAME_NODE_HTTP_PORT = 50070;
+ public static final int NAME_NODE_IPC_PORT = 8020;
+
+ public static final int DATA_NODE_HTTP_PORT = 50075;
+ public static final int DATA_NODE_DATA_TRANSFER_PORT = 50010;
+ public static final int DATA_NODE_IPC_PORT = 50020;
+
+ private HDFSPorts() {
+ }
+
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSService.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSService.java
new file mode 100644
index 0000000..0b78d98
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface HDFSService extends BeforeAllCallback, AfterAllCallback {
+
+ /**
+ * Gets the hostname of the HDFS server
+ * @return
+ */
+ String getHDFSHost();
+
+ /**
+ * Gets the port used by the HDFS server
+ * @return
+ */
+ int getPort();
+
+ /**
+ * Perform any initialization necessary
+ */
+ void initialize();
+
+ /**
+ * Shuts down the service after the test has completed
+ */
+ void shutdown();
+
+
+ @Override
+ default void beforeAll(ExtensionContext extensionContext) throws Exception
{
+ initialize();
+ }
+
+ @Override
+ default void afterAll(ExtensionContext extensionContext) throws Exception {
+ shutdown();
+ }
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java
new file mode 100644
index 0000000..17e0319
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HDFSServiceFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(HDFSServiceFactory.class);
+
+ private HDFSServiceFactory() {
+
+ }
+
+ public static HDFSService createService() {
+ String instanceType = System.getProperty("hdfs.instance.type");
+
+ if (instanceType == null ||
instanceType.equals("local-hdfs-container")) {
+ return new ContainerLocalHDFSService();
+ }
+
+ if (instanceType.equals("remote")) {
+ return new RemoteHDFSService();
+ }
+
+ LOG.error("Invalid HDFS instance type: {}. Must be either 'remote' or
'local-hdfs-container",
+ instanceType);
+ throw new UnsupportedOperationException(String.format("Invalid HDFS
instance type: %s", instanceType));
+
+ }
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HadoopBaseContainer.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HadoopBaseContainer.java
new file mode 100644
index 0000000..e01e478
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HadoopBaseContainer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import java.util.function.Consumer;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+abstract class HadoopBaseContainer<T extends GenericContainer<T>> extends
GenericContainer<T> {
+
+ public HadoopBaseContainer(Network network, String name) {
+ super(new ImageFromDockerfile("hadoop-2x:ckc", false)
+ .withFileFromClasspath(".",
+ "org/apache/camel/kafkaconnector/hdfs/services/"));
+
+ withNetwork(network);
+
+ withCreateContainerCmdModifier(
+ new Consumer<CreateContainerCmd>() {
+ @Override
+ public void accept(CreateContainerCmd createContainerCmd) {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
+ }
+ );
+ }
+
+ abstract int getHttpPort();
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/NameNodeContainer.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/NameNodeContainer.java
new file mode 100644
index 0000000..9b12e43
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/NameNodeContainer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class NameNodeContainer extends HadoopBaseContainer<NameNodeContainer> {
+
+
+
+ public NameNodeContainer(Network network) {
+ this(network, "namenode");
+ }
+
+ public NameNodeContainer(Network network, String name) {
+ super(network, name);
+
+ withCommand("sh", "-c", "/hadoop/run-namenode.sh");
+ withExposedPorts(HDFSPorts.NAME_NODE_HTTP_PORT,
HDFSPorts.NAME_NODE_IPC_PORT);
+
+ waitingFor(Wait.forHttp("/").forPort(HDFSPorts.NAME_NODE_HTTP_PORT));
+
+ addFixedExposedPort(HDFSPorts.NAME_NODE_HTTP_PORT,
HDFSPorts.NAME_NODE_HTTP_PORT);
+ addFixedExposedPort(HDFSPorts.NAME_NODE_IPC_PORT,
HDFSPorts.NAME_NODE_IPC_PORT);
+ }
+
+ public int getHttpPort() {
+ return getMappedPort(HDFSPorts.NAME_NODE_HTTP_PORT);
+ }
+
+ public int getIpcPort() {
+ return HDFSPorts.NAME_NODE_IPC_PORT;
+ }
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java
new file mode 100644
index 0000000..6717147
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/RemoteHDFSService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.services;
+
+public class RemoteHDFSService implements HDFSService {
+
+ @Override
+ public void initialize() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public String getHDFSHost() {
+ return System.getProperty("hdfs.host");
+ }
+
+ @Override
+ public int getPort() {
+ String strPort = System.getProperty("hdfs.port");
+
+ if (strPort != null) {
+ return Integer.parseInt(strPort);
+ }
+
+ return 8020;
+ }
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelHDFSPropertyFactory.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelHDFSPropertyFactory.java
new file mode 100644
index 0000000..7437a67
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelHDFSPropertyFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.sink;
+
+
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+final class CamelHDFSPropertyFactory extends
SinkConnectorPropertyFactory<CamelHDFSPropertyFactory> {
+ private CamelHDFSPropertyFactory() {
+
+ }
+
+ public CamelHDFSPropertyFactory withHostname(String value) {
+ return setProperty("camel.sink.path.hostName", value);
+ }
+
+ public CamelHDFSPropertyFactory withPort(int value) {
+ return setProperty("camel.sink.path.port", value);
+ }
+
+ public CamelHDFSPropertyFactory withPath(String value) {
+ return setProperty("camel.sink.path.path", value);
+ }
+
+ public CamelHDFSPropertyFactory withSplitStrategy(String value) {
+ return setProperty("camel.sink.endpoint.splitStrategy", value);
+ }
+
+ public CamelHDFSPropertyFactory withReplication(int value) {
+ return setProperty("camel.sink.endpoint.replication", value);
+ }
+
+ public CamelHDFSPropertyFactory withOwner(String value) {
+ return setProperty("camel.sink.endpoint.owner", value);
+ }
+
+ public CamelHDFSPropertyFactory withAppend(boolean value) {
+ return setProperty("camel.sink.endpoint.append", value);
+ }
+
+ public CamelHDFSPropertyFactory withBufferSize(int value) {
+ return setProperty("camel.sink.endpoint.bufferSize", value);
+ }
+
+
+ public static CamelHDFSPropertyFactory basic() {
+ return new CamelHDFSPropertyFactory()
+ .withName("CamelHDFSSinkConnector")
+ .withTasksMax(1)
+
.withConnectorClass("org.apache.camel.kafkaconnector.hdfs.CamelHdfsSinkConnector")
+
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withReplication(1);
+ }
+
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
new file mode 100644
index 0000000..d1efb07
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.sink;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.hdfs.services.HDFSService;
+import org.apache.camel.kafkaconnector.hdfs.services.HDFSServiceFactory;
+import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Testcontainers
+public class CamelSinkHDFSITCase extends AbstractKafkaTest {
+ @RegisterExtension
+ public static HDFSService hdfsService = HDFSServiceFactory.createService();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkHDFSITCase.class);
+
+ private HDFSEasy hdfsEasy;
+ private Path currentBasePath;
+
+ private final int expect = 10;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-hdfs-kafka-connector"};
+ }
+
+
+ @BeforeEach
+ public void setUp() throws IOException, URISyntaxException {
+ hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(),
hdfsService.getPort());
+
+ String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
+ currentBasePath = new Path(currentPath);
+
+ if (!hdfsEasy.delete(currentBasePath)) {
+ // This is OK: directory may not exist on the path
+ LOG.debug("The directory at {} was not removed {}",
currentBasePath.getName());
+ }
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (!hdfsEasy.delete(currentBasePath)) {
+ LOG.warn("The directory at {} was not removed {}",
currentBasePath.getName());
+ }
+ }
+
+ private boolean filesCreated() {
+ return hdfsEasy.filesCreated(currentBasePath, expect);
+ }
+
+
+ private String sendKafkaMessages(String baseMessage, int count) throws
java.util.concurrent.ExecutionException, InterruptedException {
+ LOG.info("Sending data to Kafka");
+ KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ for (int i = 0; i < count; i++) {
+
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), baseMessage
+ i);
+ }
+ return baseMessage;
+ }
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() {
+ try {
+ ConnectorPropertyFactory connectorPropertyFactory =
CamelHDFSPropertyFactory
+ .basic()
+ .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withHostname(hdfsService.getHDFSHost())
+ .withPort(hdfsService.getPort())
+ .withPath(currentBasePath.getName())
+ .withSplitStrategy("MESSAGES:1,IDLE:1000");
+
+ connectorPropertyFactory.log();
+
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory,
1);
+
+ final String baseMessage = "Sink test message: ";
+ sendKafkaMessages(baseMessage, expect);
+
+ boolean filesCreated = TestUtils.waitFor(this::filesCreated);
+ assertTrue(filesCreated, "The files were not created on the remote
host");
+ assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The
number of files created vs expected do not match");
+ hdfsEasy.listFiles(currentBasePath)
+ .stream()
+ .filter(f -> !f.getPath().getName().contains(".opened"))
+ .forEach(f -> printFile(f, baseMessage));
+
+ } catch (Exception e) {
+ LOG.error("HDFS test failed: {}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+ }
+
+
+
+ private void printFile(LocatedFileStatus f, String matchString) {
+ try {
+ String contents = hdfsEasy.readFile(f.getPath());
+
+ LOG.debug("Retrieved file {} with contents: {}", f.getPath(),
contents);
+ boolean contains = contents.contains(matchString);
+ assertTrue(contains, "Unexpected content for the remote file " +
f.getPath().getName());
+ } catch (IOException e) {
+ LOG.debug("Reading returned file {} failed: {}", f.getPath(),
e.getMessage());
+ fail("I/O error: " + e.getMessage());
+ }
+ }
+}
diff --git
a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
new file mode 100644
index 0000000..7733fe8
--- /dev/null
+++
b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.hdfs.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HDFSEasy {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSEasy.class);
+
+ private DistributedFileSystem dfs = new DistributedFileSystem();
+
+ public HDFSEasy(String host, int port) throws URISyntaxException,
IOException {
+ dfs.initialize(new URI("hdfs://" + host + ":" + port), new
Configuration());
+ }
+
+ public List<LocatedFileStatus> listFiles(Path path) throws IOException {
+ RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
+
+ List<LocatedFileStatus> retList = new ArrayList<>();
+ while (i.hasNext()) {
+ LocatedFileStatus locatedFileStatus = i.next();
+ retList.add(locatedFileStatus);
+ }
+
+ return retList;
+ }
+
+ public boolean delete(Path path) {
+ try {
+ if (dfs.exists(path)) {
+ LOG.debug("Removing HDFS directory {}", path.getName());
+ if (!dfs.delete(path, true)) {
+ LOG.debug("Failed to remove directory {}", path.getName());
+
+ return false;
+ }
+
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to remove HDFS directory {}: {}", path.getName(),
e.getMessage(), e);
+ }
+
+ return false;
+ }
+
+ public String readFile(Path filePath) throws IOException {
+ final FSDataInputStream streamReader = dfs.open(filePath);
+ final Scanner scanner = new Scanner(streamReader);
+
+ StringBuilder sb = new StringBuilder();
+ while (scanner.hasNextLine()) {
+ sb.append(scanner.nextLine());
+ }
+
+ return sb.toString();
+ }
+
+ public String readFile(String filePath) throws IOException {
+ return readFile(new Path(filePath));
+ }
+
+ public int countFiles(Path path) throws IOException {
+ RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
+ int files = 0;
+ while (i.hasNext()) {
+ files++;
+ i.next();
+ }
+
+ return files;
+ }
+
+ /**
+ * Checks if a set of (minimum number of) files was created on the given
path representing a directory
+ * @param path the path to check for the files
+ * @param minFiles the number of files created (using 0 just checks if the
directory is there)
+ * @return true if the path contains at least minFiles and false otherwise
+ */
+ public boolean filesCreated(Path path, int minFiles) {
+ try {
+ return countFiles(path) >= minFiles;
+ } catch (Exception e) {
+ LOG.warn("I/O exception while checking if file {} exists",
path.getName());
+
+ return false;
+ }
+ }
+
+ public boolean filesCreated(Path path) {
+ return filesCreated(path, 0);
+ }
+
+ public boolean filesCreated(String path) {
+ return filesCreated(new Path(path));
+ }
+
+ public boolean exists(Path path) {
+ try {
+ return dfs.exists(path);
+ } catch (Exception e) {
+ LOG.warn("I/O exception while checking if file {} exists",
path.getName());
+
+ return false;
+ }
+ }
+}
diff --git a/tests/itests-hdfs/src/test/resources/hdfs-site.xml
b/tests/itests-hdfs/src/test/resources/hdfs-site.xml
new file mode 100644
index 0000000..c68b177
--- /dev/null
+++ b/tests/itests-hdfs/src/test/resources/hdfs-site.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<!-- This one is used by the clients (both the Camel one as well as the
integration test one) -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+ <property>
+ <name>dfs.client.use.datanode.hostname</name>
+ <value>true</value>
+ </property>
+</configuration>
diff --git
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/Dockerfile
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/Dockerfile
new file mode 100644
index 0000000..1bba317
--- /dev/null
+++
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/Dockerfile
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM fedora:31 as builder
+ARG HADOOP_VERSION
+ENV HADOOP_VERSION ${HADOOP_VERSION:-2.10.0}
+RUN curl
https://archive.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
-o hadoop.tar.gz
+RUN mkdir -p hadoop && tar --strip-components=1 -xvf hadoop.tar.gz -C /hadoop
&& rm -f hadoop.tar.gz
+
+FROM fedora:31
+MAINTAINER Otavio Rodolfo Piske <[email protected]>
+ARG HADOOP_VERSION
+ENV HADOOP_VERSION ${HADOOP_VERSION:-2.10.0}
+EXPOSE 8020 9000 50010 50020 50070 50075
+RUN dnf install -y java-1.8.0-openjdk-headless tar gzip rsync which procps-ng
+ENV JAVA_HOME /etc/alternatives/jre
+COPY --from=builder /hadoop /hadoop
+ADD core-site.xml /hadoop/etc/hadoop/core-site.xml
+ADD hdfs-site.xml /hadoop/etc/hadoop/hdfs-site.xml
+ADD run-datanode.sh /hadoop
+ADD run-namenode.sh /hadoop
+RUN chmod +x /hadoop/*.sh
+ENV HADOOP_HOME /hadoop
+ENV HADOOP_LOG_DIR=${HADOOP_HOME}/logs
+# Uncomment this line for enabling debug log
+# ENV HADOOP_ROOT_LOGGER=DEBUG,RFA,console
+VOLUME /hdfs/
+WORKDIR /hadoop
+
+# To run this image manually, follow these steps:
+
+# Create a network:
+# docker network create hadoop-tmp
+
+# Namenode starts with (format + runtime):
+# bin/hdfs --config /hadoop/etc/hadoop/ namenode -format && bin/hdfs --config
/hadoop/etc/hadoop/ namenode
+
+# All in one shot from docker:
+# docker run --rm -it -p 8020:8020 -p 9000:9000 -p 50070:50070 --network
hadoop-tmp --name namenode hadoop /hadoop/run-namenode.sh
+
+# Datanode starts with:
+# docker run -p 50075:50075 -p 50010:50010 -p 50020:50020 --rm -it --network
hadoop-tmp --name datanode-1 hadoop /hadoop/run-datanode.sh datanode-1
+
+# If you need more nodes, just increase the number on the datanode name (ie.:
datanode-2, datanode-3, etc)s
+# docker run --rm -it --network hadoop-tmp --name datanode-2 hadoop
/hadoop/run-datanode.sh
diff --git
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/core-site.xml
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/core-site.xml
new file mode 100644
index 0000000..c817310
--- /dev/null
+++
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/core-site.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>hdfs://namenode:8020</value>
+ </property>
+</configuration>
\ No newline at end of file
diff --git
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/hdfs-site.xml
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/hdfs-site.xml
new file mode 100644
index 0000000..6a09c85
--- /dev/null
+++
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/hdfs-site.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<!-- This one is used by the HDFS instance on the containers -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>dfs.namenode.name.dir</name>
+ <value>file:///hdfs/namenode</value>
+ </property>
+ <property>
+ <name>dfs.datanode.data.dir</name>
+ <value>file:///hdfs/datanode</value>
+ </property>
+ <property>
+ <name>dfs.permissions.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>dfs.client.use.datanode.hostname</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>dfs.datanode.use.datanode.hostname</name>
+ <value>true</value>
+ </property>
+</configuration>
diff --git
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-datanode.sh
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-datanode.sh
new file mode 100755
index 0000000..d1cfc3d
--- /dev/null
+++
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-datanode.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd "${HDP_HOME}"
+
+bin/hdfs --config /hadoop/etc/hadoop/ datanode
\ No newline at end of file
diff --git
a/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-namenode.sh
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-namenode.sh
new file mode 100755
index 0000000..269d7c1
--- /dev/null
+++
b/tests/itests-hdfs/src/test/resources/org/apache/camel/kafkaconnector/hdfs/services/run-namenode.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd "${HDP_HOME}"
+
+echo "Formatting the HDFS"
+bin/hdfs --config /hadoop/etc/hadoop/ namenode -format
+
+echo "Running Hadoop's namenode"
+bin/hdfs --config /hadoop/etc/hadoop/ namenode
\ No newline at end of file
diff --git a/tests/pom.xml b/tests/pom.xml
index 94142f9..943cec2 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -51,6 +51,7 @@
<module>itests-timer</module>
<module>itests-slack</module>
<module>itests-salesforce</module>
+ <module>itests-hdfs</module>
</modules>