This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fca4eb  NIFI-9624 Removed JCenter Repository
6fca4eb is described below

commit 6fca4eb3ce99c140c055286ba4004f12fbcc0716
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Jan 24 14:04:37 2022 -0600

    NIFI-9624 Removed JCenter Repository
    
    - Removed moquette-broker and refactored tests
    - Upgraded docker-compose-rule-junit4 to 1.7.0 for Maven Central retrieval
    - Changed JSTL dependency groupId for Maven Central retrieval
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5719.
---
 .../minifi-c2/minifi-c2-integration-tests/pom.xml  |   2 +-
 minifi/minifi-integration-tests/pom.xml            |   2 +-
 .../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml  |  12 --
 .../nifi/processors/mqtt/TestConsumeMQTT.java      |  81 +++-------
 .../nifi/processors/mqtt/TestPublishMQTT.java      |  33 +---
 .../mqtt/common/TestConsumeMqttCommon.java         | 175 ++++++---------------
 .../mqtt/common/TestPublishMqttCommon.java         |   8 +-
 .../mqtt/integration/TestConsumeMQTT.java          | 133 ----------------
 .../TestPublishAndSubscribeMqttIntegration.java    | 147 -----------------
 .../mqtt/integration/TestPublishMQTT.java          |  84 ----------
 .../nifi-registry-web-docs/pom.xml                 |   2 +-
 nifi-registry/pom.xml                              |  22 ---
 pom.xml                                            |  10 --
 13 files changed, 84 insertions(+), 627 deletions(-)

diff --git a/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml 
b/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
index b8a4c5b..2f86bdc 100644
--- a/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
+++ b/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
@@ -56,7 +56,7 @@ limitations under the License.
         <dependency>
             <groupId>com.palantir.docker.compose</groupId>
             <artifactId>docker-compose-rule-junit4</artifactId>
-            <version>1.5.0</version>
+            <version>1.7.0</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/minifi/minifi-integration-tests/pom.xml 
b/minifi/minifi-integration-tests/pom.xml
index a93b25b..4512f5e 100644
--- a/minifi/minifi-integration-tests/pom.xml
+++ b/minifi/minifi-integration-tests/pom.xml
@@ -40,7 +40,7 @@ limitations under the License.
         <dependency>
             <groupId>com.palantir.docker.compose</groupId>
             <artifactId>docker-compose-rule-junit4</artifactId>
-            <version>1.5.0</version>
+            <version>1.7.0</version>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index cfb2a2a..8f399c5 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -63,18 +63,6 @@
 
         <!-- Test dependencies -->
         <dependency>
-            <groupId>io.moquette</groupId>
-            <artifactId>moquette-broker</artifactId>
-            <version>0.8.1</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 861a606..102c64f 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.mqtt;
 
-import io.moquette.proto.messages.PublishMessage;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
@@ -34,20 +33,18 @@ import org.eclipse.paho.client.mqttv3.IMqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.FilenameFilter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Proxy;
 import java.util.concurrent.BlockingQueue;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -69,12 +66,13 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
         }
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void setTlsConfiguration() {
         tlsConfiguration = new TemporaryKeyStoreBuilder().build();
     }
 
-    @Before
+    @BeforeEach
+
     public void init() {
         PUBLISH_WAIT_MS = 0;
 
@@ -111,11 +109,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon 
{
         processor.onScheduled(runner.getProcessContext());
     }
 
-    /**
-     * If the session.commit() fails, we should not remove the unprocessed 
message
-     */
     @Test
-    public void testMessageNotConsumedOnCommitFail() throws 
NoSuchFieldException, IllegalAccessException, NoSuchMethodException {
+    public void testMessageNotConsumedOnCommitFail() throws 
NoSuchFieldException, IllegalAccessException {
         testRunner.run(1, false);
         ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
         MQTTQueueMessage mock = mock(MQTTQueueMessage.class);
@@ -123,54 +118,26 @@ public class TestConsumeMQTT extends 
TestConsumeMqttCommon {
         when(mock.getTopic()).thenReturn("testTopic");
         BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor);
         mqttQueue.add(mock);
-        try {
-            ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
-            transferQueue(processor,
-                    (ProcessSession) 
Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { 
ProcessSession.class }, (proxy, method, args) -> {
-                        if (method.getName().equals("commitAsync")) {
-                            throw new RuntimeException();
-                        } else {
-                            return method.invoke(session, args);
-                        }
-                    }));
-            fail("Expected runtime exception");
-        } catch (InvocationTargetException e) {
-            assertTrue("Expected generic runtime exception, not " + e, 
e.getCause() instanceof RuntimeException);
-        }
-        assertTrue("Expected mqttQueue to contain uncommitted message.", 
mqttQueue.contains(mock));
-    }
 
-    @After
-    public void tearDown() {
-        if (MQTT_server != null) {
-            MQTT_server.stopServer();
-        }
-        final File folder =  new File("./target");
-        final File[] files = folder.listFiles( new FilenameFilter() {
-            @Override
-            public boolean accept( final File dir,
-                                   final String name ) {
-                return name.matches( "moquette_store.mapdb.*" );
-            }
-        } );
-        for ( final File file : files ) {
-            if ( !file.delete() ) {
-                System.err.println( "Can't remove " + file.getAbsolutePath() );
-            }
-        }
+        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+
+        assertThrows(InvocationTargetException.class, () -> 
transferQueue(processor,
+            (ProcessSession) 
Proxy.newProxyInstance(getClass().getClassLoader(), new 
Class[]{ProcessSession.class}, (proxy, method, args) -> {
+                if (method.getName().equals("commitAsync")) {
+                    throw new RuntimeException();
+                } else {
+                    return method.invoke(session, args);
+                }
+            })));
+        assertTrue(mqttQueue.contains(mock));
     }
 
     @Override
-    public void internalPublish(PublishMessage publishMessage) {
-        MqttMessage mqttMessage = new MqttMessage();
-        mqttMessage.setPayload(publishMessage.getPayload().array());
-        mqttMessage.setRetained(publishMessage.isRetainFlag());
-        mqttMessage.setQos(publishMessage.getQos().ordinal());
-
+    public void internalPublish(final MqttMessage message, final String 
topicName) {
         try {
-            mqttTestClient.publish(publishMessage.getTopicName(), mqttMessage);
+            mqttTestClient.publish(topicName, message);
         } catch (MqttException e) {
-            fail("Should never get an MqttException when publishing using test 
client");
+            throw new RuntimeException(e);
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 9c886d2..1755365 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -24,16 +24,11 @@ import org.apache.nifi.util.TestRunners;
 import org.eclipse.paho.client.mqttv3.IMqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
 
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
 import java.util.Arrays;
 
-import static org.junit.Assert.assertEquals;
-
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestPublishMQTT extends TestPublishMqttCommon {
 
@@ -46,8 +41,7 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
         assertEquals(topic, mqttQueueMessage.getTopic());
     }
 
-
-    public MqttTestClient mqttTestClient;
+    private MqttTestClient mqttTestClient;
 
     public class UnitTestablePublishMqtt extends PublishMQTT {
 
@@ -62,8 +56,8 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
         }
     }
 
-    @Before
-    public void init() throws IOException {
+    @BeforeEach
+    public void init() {
         UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
         testRunner = TestRunners.newTestRunner(proc);
         testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
@@ -71,21 +65,4 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
         topic = "testTopic";
         testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
     }
-
-    @After
-    public void tearDown() throws Exception {
-        final File folder =  new File("./target");
-        final File[] files = folder.listFiles( new FilenameFilter() {
-            @Override
-            public boolean accept( final File dir,
-                                   final String name ) {
-                return name.matches( "moquette_store.mapdb.*" );
-            }
-        } );
-        for ( final File file : files ) {
-            if ( !file.delete() ) {
-                System.err.println( "Can't remove " + file.getAbsolutePath() );
-            }
-        }
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index 6a5b5f1..0eec42c 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -17,10 +17,6 @@
 
 package org.apache.nifi.processors.mqtt.common;
 
-import io.moquette.proto.messages.AbstractMessage;
-import io.moquette.proto.messages.PublishMessage;
-import io.moquette.server.Server;
-
 import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
@@ -34,12 +30,13 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.eclipse.paho.client.mqttv3.IMqttClient;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -50,20 +47,26 @@ import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_
 import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
 import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public abstract class TestConsumeMqttCommon {
 
     public int PUBLISH_WAIT_MS = 1000;
     public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
 
-    public Server MQTT_server;
     public TestRunner testRunner;
     public String broker;
 
-    public abstract void internalPublish(PublishMessage publishMessage);
+    private static final String STRING_MESSAGE = "testMessage";
+    private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
+
+    private static final int MOST_ONE = 0;
+    private static final int LEAST_ONE = 1;
+    private static final int EXACTLY_ONCE = 2;
+
+    public abstract void internalPublish(MqttMessage message, String 
topicName);
 
     @Test
     public void testClientIDConfiguration() {
@@ -85,7 +88,7 @@ public abstract class TestConsumeMqttCommon {
     }
 
     @Test
-    public void testLastWillConfig() throws Exception {
+    public void testLastWillConfig() {
         testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill 
message");
         testRunner.assertNotValid();
         testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill 
topic");
@@ -111,14 +114,7 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
-        testMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -155,14 +151,7 @@ public abstract class TestConsumeMqttCommon {
 
         consumeMQTT.onUnscheduled(testRunner.getProcessContext());
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
-        testMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
 
         consumeMQTT.onScheduled(testRunner.getProcessContext());
         reconnect(consumeMQTT, testRunner.getProcessContext());
@@ -202,14 +191,7 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
-        testMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
+        publishMessage(STRING_MESSAGE, LEAST_ONE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -245,14 +227,7 @@ public abstract class TestConsumeMqttCommon {
 
         consumeMQTT.onUnscheduled(testRunner.getProcessContext());
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
-        testMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
+        publishMessage(STRING_MESSAGE, LEAST_ONE);
 
         consumeMQTT.onScheduled(testRunner.getProcessContext());
         reconnect(consumeMQTT, testRunner.getProcessContext());
@@ -292,14 +267,7 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        testMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
+        publishMessage(STRING_MESSAGE, MOST_ONE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -344,6 +312,7 @@ public abstract class TestConsumeMqttCommon {
 
         Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
         f.setAccessible(true);
+        @SuppressWarnings("unchecked")
         LinkedBlockingQueue<MQTTQueueMessage> queue = 
(LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
         queue.add(testMessage);
 
@@ -371,13 +340,6 @@ public abstract class TestConsumeMqttCommon {
 
         testRunner.assertValid();
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
-        testMessage.setRetainFlag(false);
-
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
         reconnect(consumeMQTT, testRunner.getProcessContext());
@@ -386,8 +348,8 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        internalPublish(testMessage);
-        internalPublish(testMessage);
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
         consumeMQTT.onUnscheduled(testRunner.getProcessContext());
@@ -439,36 +401,22 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache 
NiFi\"}".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        testMessage.setRetainFlag(false);
-
-        PublishMessage badMessage = new PublishMessage();
-        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
-        badMessage.setTopicName("testTopic");
-        badMessage.setDupFlag(false);
-        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        badMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
-        internalPublish(badMessage);
-        internalPublish(testMessage);
+        publishMessage(JSON_PAYLOAD, MOST_ONE);
+        publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
+        publishMessage(JSON_PAYLOAD, MOST_ONE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
         testRunner.run(1, false, false);
 
         List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertTrue(flowFiles.size() == 1);
+        assertEquals(1, flowFiles.size());
         assertEquals("[{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
                 + "{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
                 new String(flowFiles.get(0).toByteArray()));
 
         List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertTrue(badFlowFiles.size() == 1);
+        assertEquals(1, badFlowFiles.size());
         assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
 
         // clean runner by removing records reader/writer
@@ -489,23 +437,9 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache 
NiFi\"}".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        testMessage.setRetainFlag(false);
-
-        PublishMessage badMessage = new PublishMessage();
-        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
-        badMessage.setTopicName("testTopic");
-        badMessage.setDupFlag(false);
-        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        badMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
-        internalPublish(badMessage);
-        internalPublish(testMessage);
+        publishMessage(JSON_PAYLOAD, MOST_ONE);
+        publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
+        publishMessage(JSON_PAYLOAD, MOST_ONE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
         Thread.sleep(PUBLISH_WAIT_MS);
@@ -520,7 +454,7 @@ public abstract class TestConsumeMqttCommon {
                 new String(flowFiles.get(0).toByteArray()));
 
         List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertTrue(badFlowFiles.size() == 0);
+        assertEquals(0, badFlowFiles.size());
 
         // clean runner by removing message demarcator
         testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
@@ -552,34 +486,20 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache 
NiFi\"}".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        testMessage.setRetainFlag(false);
-
-        PublishMessage badMessage = new PublishMessage();
-        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
-        badMessage.setTopicName("testTopic");
-        badMessage.setDupFlag(false);
-        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        badMessage.setRetainFlag(false);
-
-        internalPublish(testMessage);
-        internalPublish(badMessage);
-        internalPublish(testMessage);
+        publishMessage(JSON_PAYLOAD, LEAST_ONE);
+        publishMessage(THIS_IS_NOT_JSON, LEAST_ONE);
+        publishMessage(JSON_PAYLOAD, LEAST_ONE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
         testRunner.run(1, false, false);
 
         List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertTrue(flowFiles.size() == 1);
+        assertEquals(1, flowFiles.size());
         assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache 
NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
 
         List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertTrue(badFlowFiles.size() == 1);
+        assertEquals(1, badFlowFiles.size());
         assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
 
         // clean runner by removing records reader/writer
@@ -613,21 +533,14 @@ public abstract class TestConsumeMqttCommon {
 
         assertTrue(isConnected(consumeMQTT));
 
-        PublishMessage badMessage = new PublishMessage();
-        badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
-        badMessage.setTopicName("testTopic");
-        badMessage.setDupFlag(false);
-        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
-        badMessage.setRetainFlag(false);
-
-        internalPublish(badMessage);
+        publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
         testRunner.run(1, false, false);
 
         List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertTrue(badFlowFiles.size() == 1);
+        assertEquals(1, badFlowFiles.size());
         assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
 
         // clean runner by removing records reader/writer
@@ -643,12 +556,13 @@ public abstract class TestConsumeMqttCommon {
     }
 
 
-    public static void reconnect(ConsumeMQTT processor, ProcessContext 
context) throws NoSuchFieldException, IllegalAccessException, 
NoSuchMethodException, InvocationTargetException {
+    public static void reconnect(ConsumeMQTT processor, ProcessContext 
context) throws IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
         Method method = 
ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
         method.setAccessible(true);
         method.invoke(processor, context);
     }
 
+    @SuppressWarnings("unchecked")
     public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT 
consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
         Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
         mqttQueueField.setAccessible(true);
@@ -669,4 +583,13 @@ public abstract class TestConsumeMqttCommon {
             assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvents.get(0).getEventType());
         }
     }
+
+    private void publishMessage(final String payload, final int qos) {
+        final MqttMessage message = new MqttMessage();
+        message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
+        message.setQos(qos);
+        message.setRetained(false);
+
+        internalPublish(message, "testTopic");
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
index fc560af..d82bc4d 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
@@ -17,23 +17,21 @@
 
 package org.apache.nifi.processors.mqtt.common;
 
-import io.moquette.server.Server;
 import org.apache.nifi.processors.mqtt.PublishMQTT;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.TestRunner;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
 import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public abstract class TestPublishMqttCommon {
 
-    public Server MQTT_server;
     public TestRunner testRunner;
     public String topic;
 
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
deleted file mode 100644
index d7ed0e0..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.integration;
-
-import io.moquette.BrokerConstants;
-import io.moquette.proto.messages.AbstractMessage;
-import io.moquette.proto.messages.PublishMessage;
-import io.moquette.server.Server;
-import io.moquette.server.config.IConfig;
-import io.moquette.server.config.MemoryConfig;
-import org.apache.nifi.processors.mqtt.ConsumeMQTT;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
-
-
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
-
-
-    private void startServer() throws IOException {
-        MQTT_server = new Server();
-        final Properties configProps = new Properties();
-        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
-        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
-        IConfig server_config = new MemoryConfig(configProps);
-        MQTT_server.startServer(server_config);
-    }
-
-    @Before
-    public void init() throws IOException {
-        startServer();
-
-        broker = "tcp://localhost:1883";
-        testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
-        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
-        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
-        testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
-        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (MQTT_server != null) {
-            MQTT_server.stopServer();
-        }
-        final File folder =  new File("./target");
-        final File[] files = folder.listFiles( new FilenameFilter() {
-            @Override
-            public boolean accept( final File dir,
-                                   final String name ) {
-                return name.matches( "moquette_store.mapdb.*" );
-            }
-        } );
-        for ( final File file : files ) {
-            if ( !file.delete() ) {
-                System.err.println( "Can't remove " + file.getAbsolutePath() );
-            }
-        }
-    }
-
-    @Test
-    public void testRetainedQoS2() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-
-        testRunner.assertValid();
-
-        PublishMessage testMessage = new PublishMessage();
-        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
-        testMessage.setTopicName("testTopic");
-        testMessage.setDupFlag(false);
-        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
-        testMessage.setRetainFlag(true);
-
-        internalPublish(testMessage);
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true");
-    }
-
-    @Override
-    public void internalPublish(PublishMessage publishMessage) {
-        MQTT_server.internalPublish(publishMessage);
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
deleted file mode 100644
index dc09ce1..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.integration;
-
-import io.moquette.BrokerConstants;
-import io.moquette.server.Server;
-import io.moquette.server.config.IConfig;
-import io.moquette.server.config.MemoryConfig;
-import org.apache.nifi.processors.mqtt.ConsumeMQTT;
-import org.apache.nifi.processors.mqtt.PublishMQTT;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-import java.util.Properties;
-
-import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
-import static 
org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon.reconnect;
-
-public class TestPublishAndSubscribeMqttIntegration {
-    private TestRunner testSubscribeRunner;
-    private TestRunner testPublishRunner;
-    private Server MQTT_server;
-
-    private static int PUBLISH_WAIT_MS = 1000;
-
-    private void startServer() throws IOException {
-        MQTT_server = new Server();
-        final Properties configProps = new Properties();
-        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
-        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
-        IConfig server_config = new MemoryConfig(configProps);
-        MQTT_server.startServer(server_config);
-    }
-
-    @Before
-    public void init() throws IOException {
-        startServer();
-        testSubscribeRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
-        testPublishRunner = TestRunners.newTestRunner(PublishMQTT.class);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (MQTT_server != null) {
-            MQTT_server.stopServer();
-        }
-        final File folder =  new File("./target");
-        final File[] files = folder.listFiles( new FilenameFilter() {
-            @Override
-            public boolean accept( final File dir,
-                                   final String name ) {
-                return name.matches( "moquette_store.mapdb.*" );
-            }
-        } );
-        for ( final File file : files ) {
-            if ( !file.delete() ) {
-                System.err.println( "Can't remove " + file.getAbsolutePath() );
-            }
-        }
-    }
-
-    @Test
-    public void testBasic() throws Exception {
-        subscribe();
-        publishAndVerify();
-        Thread.sleep(PUBLISH_WAIT_MS);
-        testSubscribeRunner.run();
-        subscribeVerify();
-    }
-
-    private void publishAndVerify(){
-        testPublishRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        testPublishRunner.setProperty(PublishMQTT.PROP_CLIENTID, 
"TestPublishClient");
-        testPublishRunner.setProperty(PublishMQTT.PROP_QOS, "2");
-        testPublishRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
-        testPublishRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
-
-        testPublishRunner.assertValid();
-
-        String testMessage = "testMessage";
-        testPublishRunner.enqueue(testMessage.getBytes());
-
-        testPublishRunner.run();
-
-        testPublishRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-        testPublishRunner.assertTransferCount(REL_SUCCESS, 1);
-    }
-
-    private void subscribe() throws IOException, ClassNotFoundException, 
MqttException, InvocationTargetException, NoSuchMethodException, 
IllegalAccessException, NoSuchFieldException {
-        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, 
"TestSubscribeClient");
-        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, 
"testTopic");
-        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, 
"100");
-
-        testSubscribeRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testSubscribeRunner.getProcessor();
-        consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext());
-        reconnect(consumeMQTT, testSubscribeRunner.getProcessContext());
-    }
-
-    private void subscribeVerify(){
-        testSubscribeRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-
-        List<MockFlowFile> flowFiles = 
testSubscribeRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, 
"tcp://localhost:1883");
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
deleted file mode 100644
index 67c883e..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.integration;
-
-import io.moquette.BrokerConstants;
-import io.moquette.server.Server;
-import io.moquette.server.config.IConfig;
-import io.moquette.server.config.MemoryConfig;
-import org.apache.nifi.processors.mqtt.PublishMQTT;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.Properties;
-
-import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
-
-
-public class TestPublishMQTT extends TestPublishMqttCommon {
-
-
-    private void startServer() throws IOException {
-        MQTT_server = new Server();
-        final Properties configProps = new Properties();
-        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
-        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
-        IConfig server_config = new MemoryConfig(configProps);
-        MQTT_server.startServer(server_config);
-    }
-
-    @Before
-    public void init() throws IOException {
-        startServer();
-        testRunner = TestRunners.newTestRunner(PublishMQTT.class);
-        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
-        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
-        testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
-    }
-
-    @After
-    public void tearDown() {
-        if (MQTT_server != null) {
-            MQTT_server.stopServer();
-        }
-        final File folder =  new File("./target");
-        final File[] files = folder.listFiles( new FilenameFilter() {
-            @Override
-            public boolean accept( final File dir,
-                                   final String name ) {
-                return name.matches( "moquette_store.mapdb.*" );
-            }
-        } );
-        for ( final File file : files ) {
-            if ( !file.delete() ) {
-                System.err.println( "Can't remove " + file.getAbsolutePath() );
-            }
-        }
-    }
-
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
-        //Cannot verify published message without subscribing and consuming it 
which is outside the scope of this test.
-    }
-}
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml 
b/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
index 48df689..0900532 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
+++ b/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
@@ -60,7 +60,7 @@
         <!-- Needed this dependency to resolve the taglib inside 
documentation.jsp, otherwise an error is encountered
                 "The absolute uri: http://java.sun.com/jsp/jstl/core cannot be 
resolved" -->
         <dependency>
-            <groupId>javax.servlet.jsp.jstl</groupId>
+            <groupId>javax.servlet</groupId>
             <artifactId>jstl</artifactId>
             <version>1.2</version>
         </dependency>
diff --git a/nifi-registry/pom.xml b/nifi-registry/pom.xml
index 3a4c9dd..ee34b8e 100644
--- a/nifi-registry/pom.xml
+++ b/nifi-registry/pom.xml
@@ -48,28 +48,6 @@
         <jgit.version>5.13.0.202109080827-r</jgit.version>
     </properties>
 
-    <pluginRepositories>
-        <pluginRepository>
-            <id>jcenter-releases</id>
-            <name>jcenter</name>
-            <url>https://jcenter.bintray.com</url>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </pluginRepository>
-        <pluginRepository>
-            <id>bintray</id>
-            <name>Groovy Bintray</name>
-            <url>https://dl.bintray.com/groovy/maven</url>
-            <releases>
-                <updatePolicy>never</updatePolicy>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </pluginRepository>
-    </pluginRepositories>
-
     <dependencyManagement>
         <dependencies>
             <!-- Logging dependencies that will be directly in lib -->
diff --git a/pom.xml b/pom.xml
index 39ff28f..10e1530 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,16 +149,6 @@
             </snapshots>
         </repository>
         <repository>
-            <id>jcenter</id>
-            <url>https://jcenter.bintray.com</url>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-        </repository>
-        <repository>
             <id>Shibboleth</id>
             <name>Shibboleth</name>
             
<url>https://build.shibboleth.net/nexus/content/repositories/releases/</url>

Reply via email to