This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6fca4eb NIFI-9624 Removed JCenter Repository
6fca4eb is described below
commit 6fca4eb3ce99c140c055286ba4004f12fbcc0716
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Jan 24 14:04:37 2022 -0600
NIFI-9624 Removed JCenter Repository
- Removed moquette-broker and refactored tests
- Upgraded docker-compose-rule-junit4 to 1.7.0 for Maven Central retrieval
- Changed JSTL dependency groupId for Maven Central retrieval
Signed-off-by: Joe Gresock <[email protected]>
This closes #5719.
---
.../minifi-c2/minifi-c2-integration-tests/pom.xml | 2 +-
minifi/minifi-integration-tests/pom.xml | 2 +-
.../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml | 12 --
.../nifi/processors/mqtt/TestConsumeMQTT.java | 81 +++-------
.../nifi/processors/mqtt/TestPublishMQTT.java | 33 +---
.../mqtt/common/TestConsumeMqttCommon.java | 175 ++++++---------------
.../mqtt/common/TestPublishMqttCommon.java | 8 +-
.../mqtt/integration/TestConsumeMQTT.java | 133 ----------------
.../TestPublishAndSubscribeMqttIntegration.java | 147 -----------------
.../mqtt/integration/TestPublishMQTT.java | 84 ----------
.../nifi-registry-web-docs/pom.xml | 2 +-
nifi-registry/pom.xml | 22 ---
pom.xml | 10 --
13 files changed, 84 insertions(+), 627 deletions(-)
diff --git a/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
b/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
index b8a4c5b..2f86bdc 100644
--- a/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
+++ b/minifi/minifi-c2/minifi-c2-integration-tests/pom.xml
@@ -56,7 +56,7 @@ limitations under the License.
<dependency>
<groupId>com.palantir.docker.compose</groupId>
<artifactId>docker-compose-rule-junit4</artifactId>
- <version>1.5.0</version>
+ <version>1.7.0</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/minifi/minifi-integration-tests/pom.xml
b/minifi/minifi-integration-tests/pom.xml
index a93b25b..4512f5e 100644
--- a/minifi/minifi-integration-tests/pom.xml
+++ b/minifi/minifi-integration-tests/pom.xml
@@ -40,7 +40,7 @@ limitations under the License.
<dependency>
<groupId>com.palantir.docker.compose</groupId>
<artifactId>docker-compose-rule-junit4</artifactId>
- <version>1.5.0</version>
+ <version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index cfb2a2a..8f399c5 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -63,18 +63,6 @@
<!-- Test dependencies -->
<dependency>
- <groupId>io.moquette</groupId>
- <artifactId>moquette-broker</artifactId>
- <version>0.8.1</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 861a606..102c64f 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.mqtt;
-import io.moquette.proto.messages.PublishMessage;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
@@ -34,20 +33,18 @@ import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.FilenameFilter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.concurrent.BlockingQueue;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -69,12 +66,13 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
}
}
- @BeforeClass
+ @BeforeAll
public static void setTlsConfiguration() {
tlsConfiguration = new TemporaryKeyStoreBuilder().build();
}
- @Before
+ @BeforeEach
+
public void init() {
PUBLISH_WAIT_MS = 0;
@@ -111,11 +109,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon
{
processor.onScheduled(runner.getProcessContext());
}
- /**
- * If the session.commit() fails, we should not remove the unprocessed
message
- */
@Test
- public void testMessageNotConsumedOnCommitFail() throws
NoSuchFieldException, IllegalAccessException, NoSuchMethodException {
+ public void testMessageNotConsumedOnCommitFail() throws
NoSuchFieldException, IllegalAccessException {
testRunner.run(1, false);
ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
MQTTQueueMessage mock = mock(MQTTQueueMessage.class);
@@ -123,54 +118,26 @@ public class TestConsumeMQTT extends
TestConsumeMqttCommon {
when(mock.getTopic()).thenReturn("testTopic");
BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock);
- try {
- ProcessSession session =
testRunner.getProcessSessionFactory().createSession();
- transferQueue(processor,
- (ProcessSession)
Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {
ProcessSession.class }, (proxy, method, args) -> {
- if (method.getName().equals("commitAsync")) {
- throw new RuntimeException();
- } else {
- return method.invoke(session, args);
- }
- }));
- fail("Expected runtime exception");
- } catch (InvocationTargetException e) {
- assertTrue("Expected generic runtime exception, not " + e,
e.getCause() instanceof RuntimeException);
- }
- assertTrue("Expected mqttQueue to contain uncommitted message.",
mqttQueue.contains(mock));
- }
- @After
- public void tearDown() {
- if (MQTT_server != null) {
- MQTT_server.stopServer();
- }
- final File folder = new File("./target");
- final File[] files = folder.listFiles( new FilenameFilter() {
- @Override
- public boolean accept( final File dir,
- final String name ) {
- return name.matches( "moquette_store.mapdb.*" );
- }
- } );
- for ( final File file : files ) {
- if ( !file.delete() ) {
- System.err.println( "Can't remove " + file.getAbsolutePath() );
- }
- }
+ ProcessSession session =
testRunner.getProcessSessionFactory().createSession();
+
+ assertThrows(InvocationTargetException.class, () ->
transferQueue(processor,
+ (ProcessSession)
Proxy.newProxyInstance(getClass().getClassLoader(), new
Class[]{ProcessSession.class}, (proxy, method, args) -> {
+ if (method.getName().equals("commitAsync")) {
+ throw new RuntimeException();
+ } else {
+ return method.invoke(session, args);
+ }
+ })));
+ assertTrue(mqttQueue.contains(mock));
}
@Override
- public void internalPublish(PublishMessage publishMessage) {
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setPayload(publishMessage.getPayload().array());
- mqttMessage.setRetained(publishMessage.isRetainFlag());
- mqttMessage.setQos(publishMessage.getQos().ordinal());
-
+ public void internalPublish(final MqttMessage message, final String
topicName) {
try {
- mqttTestClient.publish(publishMessage.getTopicName(), mqttMessage);
+ mqttTestClient.publish(topicName, message);
} catch (MqttException e) {
- fail("Should never get an MqttException when publishing using test
client");
+ throw new RuntimeException(e);
}
}
}
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 9c886d2..1755365 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -24,16 +24,11 @@ import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
import java.util.Arrays;
-import static org.junit.Assert.assertEquals;
-
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestPublishMQTT extends TestPublishMqttCommon {
@@ -46,8 +41,7 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
assertEquals(topic, mqttQueueMessage.getTopic());
}
-
- public MqttTestClient mqttTestClient;
+ private MqttTestClient mqttTestClient;
public class UnitTestablePublishMqtt extends PublishMQTT {
@@ -62,8 +56,8 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
}
}
- @Before
- public void init() throws IOException {
+ @BeforeEach
+ public void init() {
UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(PublishMQTT.PROP_BROKER_URI,
"tcp://localhost:1883");
@@ -71,21 +65,4 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
topic = "testTopic";
testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
}
-
- @After
- public void tearDown() throws Exception {
- final File folder = new File("./target");
- final File[] files = folder.listFiles( new FilenameFilter() {
- @Override
- public boolean accept( final File dir,
- final String name ) {
- return name.matches( "moquette_store.mapdb.*" );
- }
- } );
- for ( final File file : files ) {
- if ( !file.delete() ) {
- System.err.println( "Can't remove " + file.getAbsolutePath() );
- }
- }
- }
}
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index 6a5b5f1..0eec42c 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -17,10 +17,6 @@
package org.apache.nifi.processors.mqtt.common;
-import io.moquette.proto.messages.AbstractMessage;
-import io.moquette.proto.messages.PublishMessage;
-import io.moquette.server.Server;
-
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
@@ -34,12 +30,13 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -50,20 +47,26 @@ import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class TestConsumeMqttCommon {
public int PUBLISH_WAIT_MS = 1000;
public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
- public Server MQTT_server;
public TestRunner testRunner;
public String broker;
- public abstract void internalPublish(PublishMessage publishMessage);
+ private static final String STRING_MESSAGE = "testMessage";
+ private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
+
+ private static final int MOST_ONE = 0;
+ private static final int LEAST_ONE = 1;
+ private static final int EXACTLY_ONCE = 2;
+
+ public abstract void internalPublish(MqttMessage message, String
topicName);
@Test
public void testClientIDConfiguration() {
@@ -85,7 +88,7 @@ public abstract class TestConsumeMqttCommon {
}
@Test
- public void testLastWillConfig() throws Exception {
+ public void testLastWillConfig() {
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill
message");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill
topic");
@@ -111,14 +114,7 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
- testMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
@@ -155,14 +151,7 @@ public abstract class TestConsumeMqttCommon {
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
- testMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
@@ -202,14 +191,7 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
- testMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
+ publishMessage(STRING_MESSAGE, LEAST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
@@ -245,14 +227,7 @@ public abstract class TestConsumeMqttCommon {
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
- testMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
+ publishMessage(STRING_MESSAGE, LEAST_ONE);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
@@ -292,14 +267,7 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- testMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
+ publishMessage(STRING_MESSAGE, MOST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
@@ -344,6 +312,7 @@ public abstract class TestConsumeMqttCommon {
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
+ @SuppressWarnings("unchecked")
LinkedBlockingQueue<MQTTQueueMessage> queue =
(LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
queue.add(testMessage);
@@ -371,13 +340,6 @@ public abstract class TestConsumeMqttCommon {
testRunner.assertValid();
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
- testMessage.setRetainFlag(false);
-
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
@@ -386,8 +348,8 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- internalPublish(testMessage);
- internalPublish(testMessage);
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
@@ -439,36 +401,22 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache
NiFi\"}".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- testMessage.setRetainFlag(false);
-
- PublishMessage badMessage = new PublishMessage();
- badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
- badMessage.setTopicName("testTopic");
- badMessage.setDupFlag(false);
- badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- badMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
- internalPublish(badMessage);
- internalPublish(testMessage);
+ publishMessage(JSON_PAYLOAD, MOST_ONE);
+ publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
+ publishMessage(JSON_PAYLOAD, MOST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertTrue(flowFiles.size() == 1);
+ assertEquals(1, flowFiles.size());
assertEquals("[{\"name\":\"Apache
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ "{\"name\":\"Apache
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertTrue(badFlowFiles.size() == 1);
+ assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new
String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
@@ -489,23 +437,9 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache
NiFi\"}".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- testMessage.setRetainFlag(false);
-
- PublishMessage badMessage = new PublishMessage();
- badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
- badMessage.setTopicName("testTopic");
- badMessage.setDupFlag(false);
- badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- badMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
- internalPublish(badMessage);
- internalPublish(testMessage);
+ publishMessage(JSON_PAYLOAD, MOST_ONE);
+ publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
+ publishMessage(JSON_PAYLOAD, MOST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
Thread.sleep(PUBLISH_WAIT_MS);
@@ -520,7 +454,7 @@ public abstract class TestConsumeMqttCommon {
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertTrue(badFlowFiles.size() == 0);
+ assertEquals(0, badFlowFiles.size());
// clean runner by removing message demarcator
testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
@@ -552,34 +486,20 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache
NiFi\"}".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- testMessage.setRetainFlag(false);
-
- PublishMessage badMessage = new PublishMessage();
- badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
- badMessage.setTopicName("testTopic");
- badMessage.setDupFlag(false);
- badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- badMessage.setRetainFlag(false);
-
- internalPublish(testMessage);
- internalPublish(badMessage);
- internalPublish(testMessage);
+ publishMessage(JSON_PAYLOAD, LEAST_ONE);
+ publishMessage(THIS_IS_NOT_JSON, LEAST_ONE);
+ publishMessage(JSON_PAYLOAD, LEAST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertTrue(flowFiles.size() == 1);
+ assertEquals(1, flowFiles.size());
assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache
NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertTrue(badFlowFiles.size() == 1);
+ assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new
String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
@@ -613,21 +533,14 @@ public abstract class TestConsumeMqttCommon {
assertTrue(isConnected(consumeMQTT));
- PublishMessage badMessage = new PublishMessage();
- badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
- badMessage.setTopicName("testTopic");
- badMessage.setDupFlag(false);
- badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
- badMessage.setRetainFlag(false);
-
- internalPublish(badMessage);
+ publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> badFlowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertTrue(badFlowFiles.size() == 1);
+ assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new
String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
@@ -643,12 +556,13 @@ public abstract class TestConsumeMqttCommon {
}
- public static void reconnect(ConsumeMQTT processor, ProcessContext
context) throws NoSuchFieldException, IllegalAccessException,
NoSuchMethodException, InvocationTargetException {
+ public static void reconnect(ConsumeMQTT processor, ProcessContext
context) throws IllegalAccessException, NoSuchMethodException,
InvocationTargetException {
Method method =
ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
method.setAccessible(true);
method.invoke(processor, context);
}
+ @SuppressWarnings("unchecked")
public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT
consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
mqttQueueField.setAccessible(true);
@@ -669,4 +583,13 @@ public abstract class TestConsumeMqttCommon {
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvents.get(0).getEventType());
}
}
+
+ private void publishMessage(final String payload, final int qos) {
+ final MqttMessage message = new MqttMessage();
+ message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
+ message.setQos(qos);
+ message.setRetained(false);
+
+ internalPublish(message, "testTopic");
+ }
}
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
index fc560af..d82bc4d 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
@@ -17,23 +17,21 @@
package org.apache.nifi.processors.mqtt.common;
-import io.moquette.server.Server;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.TestRunner;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public abstract class TestPublishMqttCommon {
- public Server MQTT_server;
public TestRunner testRunner;
public String topic;
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
deleted file mode 100644
index d7ed0e0..0000000
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.integration;
-
-import io.moquette.BrokerConstants;
-import io.moquette.proto.messages.AbstractMessage;
-import io.moquette.proto.messages.PublishMessage;
-import io.moquette.server.Server;
-import io.moquette.server.config.IConfig;
-import io.moquette.server.config.MemoryConfig;
-import org.apache.nifi.processors.mqtt.ConsumeMQTT;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
-import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
-import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
-
-
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
-
-
- private void startServer() throws IOException {
- MQTT_server = new Server();
- final Properties configProps = new Properties();
- configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
-
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
- IConfig server_config = new MemoryConfig(configProps);
- MQTT_server.startServer(server_config);
- }
-
- @Before
- public void init() throws IOException {
- startServer();
-
- broker = "tcp://localhost:1883";
- testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
- testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
- testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
- testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
- testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
- }
-
- @After
- public void tearDown() throws Exception {
- if (MQTT_server != null) {
- MQTT_server.stopServer();
- }
- final File folder = new File("./target");
- final File[] files = folder.listFiles( new FilenameFilter() {
- @Override
- public boolean accept( final File dir,
- final String name ) {
- return name.matches( "moquette_store.mapdb.*" );
- }
- } );
- for ( final File file : files ) {
- if ( !file.delete() ) {
- System.err.println( "Can't remove " + file.getAbsolutePath() );
- }
- }
- }
-
- @Test
- public void testRetainedQoS2() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-
- testRunner.assertValid();
-
- PublishMessage testMessage = new PublishMessage();
- testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
- testMessage.setTopicName("testTopic");
- testMessage.setDupFlag(false);
- testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
- testMessage.setRetainFlag(true);
-
- internalPublish(testMessage);
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-
- List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true");
- }
-
- @Override
- public void internalPublish(PublishMessage publishMessage) {
- MQTT_server.internalPublish(publishMessage);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
deleted file mode 100644
index dc09ce1..0000000
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.integration;
-
-import io.moquette.BrokerConstants;
-import io.moquette.server.Server;
-import io.moquette.server.config.IConfig;
-import io.moquette.server.config.MemoryConfig;
-import org.apache.nifi.processors.mqtt.ConsumeMQTT;
-import org.apache.nifi.processors.mqtt.PublishMQTT;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-import java.util.Properties;
-
-import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
-import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
-import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
-import static
org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon.reconnect;
-
-public class TestPublishAndSubscribeMqttIntegration {
- private TestRunner testSubscribeRunner;
- private TestRunner testPublishRunner;
- private Server MQTT_server;
-
- private static int PUBLISH_WAIT_MS = 1000;
-
- private void startServer() throws IOException {
- MQTT_server = new Server();
- final Properties configProps = new Properties();
- configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
-
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
- IConfig server_config = new MemoryConfig(configProps);
- MQTT_server.startServer(server_config);
- }
-
- @Before
- public void init() throws IOException {
- startServer();
- testSubscribeRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
- testPublishRunner = TestRunners.newTestRunner(PublishMQTT.class);
- }
-
- @After
- public void tearDown() throws Exception {
- if (MQTT_server != null) {
- MQTT_server.stopServer();
- }
- final File folder = new File("./target");
- final File[] files = folder.listFiles( new FilenameFilter() {
- @Override
- public boolean accept( final File dir,
- final String name ) {
- return name.matches( "moquette_store.mapdb.*" );
- }
- } );
- for ( final File file : files ) {
- if ( !file.delete() ) {
- System.err.println( "Can't remove " + file.getAbsolutePath() );
- }
- }
- }
-
- @Test
- public void testBasic() throws Exception {
- subscribe();
- publishAndVerify();
- Thread.sleep(PUBLISH_WAIT_MS);
- testSubscribeRunner.run();
- subscribeVerify();
- }
-
- private void publishAndVerify(){
- testPublishRunner.setProperty(PublishMQTT.PROP_BROKER_URI,
"tcp://localhost:1883");
- testPublishRunner.setProperty(PublishMQTT.PROP_CLIENTID,
"TestPublishClient");
- testPublishRunner.setProperty(PublishMQTT.PROP_QOS, "2");
- testPublishRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
- testPublishRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
-
- testPublishRunner.assertValid();
-
- String testMessage = "testMessage";
- testPublishRunner.enqueue(testMessage.getBytes());
-
- testPublishRunner.run();
-
- testPublishRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
- testPublishRunner.assertTransferCount(REL_SUCCESS, 1);
- }
-
- private void subscribe() throws IOException, ClassNotFoundException,
MqttException, InvocationTargetException, NoSuchMethodException,
IllegalAccessException, NoSuchFieldException {
- testSubscribeRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI,
"tcp://localhost:1883");
- testSubscribeRunner.setProperty(ConsumeMQTT.PROP_CLIENTID,
"TestSubscribeClient");
- testSubscribeRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER,
"testTopic");
- testSubscribeRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
- testSubscribeRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE,
"100");
-
- testSubscribeRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT)
testSubscribeRunner.getProcessor();
- consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext());
- reconnect(consumeMQTT, testSubscribeRunner.getProcessContext());
- }
-
- private void subscribeVerify(){
- testSubscribeRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-
- List<MockFlowFile> flowFiles =
testSubscribeRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY,
"tcp://localhost:1883");
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-}
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
deleted file mode 100644
index 67c883e..0000000
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.integration;
-
-import io.moquette.BrokerConstants;
-import io.moquette.server.Server;
-import io.moquette.server.config.IConfig;
-import io.moquette.server.config.MemoryConfig;
-import org.apache.nifi.processors.mqtt.PublishMQTT;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.Properties;
-
-import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
-
-
-public class TestPublishMQTT extends TestPublishMqttCommon {
-
-
- private void startServer() throws IOException {
- MQTT_server = new Server();
- final Properties configProps = new Properties();
- configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
-
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
- IConfig server_config = new MemoryConfig(configProps);
- MQTT_server.startServer(server_config);
- }
-
- @Before
- public void init() throws IOException {
- startServer();
- testRunner = TestRunners.newTestRunner(PublishMQTT.class);
- testRunner.setProperty(PublishMQTT.PROP_BROKER_URI,
"tcp://localhost:1883");
- testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
- testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
- testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
- }
-
- @After
- public void tearDown() {
- if (MQTT_server != null) {
- MQTT_server.stopServer();
- }
- final File folder = new File("./target");
- final File[] files = folder.listFiles( new FilenameFilter() {
- @Override
- public boolean accept( final File dir,
- final String name ) {
- return name.matches( "moquette_store.mapdb.*" );
- }
- } );
- for ( final File file : files ) {
- if ( !file.delete() ) {
- System.err.println( "Can't remove " + file.getAbsolutePath() );
- }
- }
- }
-
- @Override
- public void verifyPublishedMessage(byte[] payload, int qos, boolean
retain) {
- //Cannot verify published message without subscribing and consuming it
which is outside the scope of this test.
- }
-}
diff --git a/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
b/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
index 48df689..0900532 100644
--- a/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
+++ b/nifi-registry/nifi-registry-core/nifi-registry-web-docs/pom.xml
@@ -60,7 +60,7 @@
<!-- Needed this dependency to resolve the taglib inside
documentation.jsp, otherwise an error is encountered
"The absolute uri: http://java.sun.com/jsp/jstl/core cannot be
resolved" -->
<dependency>
- <groupId>javax.servlet.jsp.jstl</groupId>
+ <groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
diff --git a/nifi-registry/pom.xml b/nifi-registry/pom.xml
index 3a4c9dd..ee34b8e 100644
--- a/nifi-registry/pom.xml
+++ b/nifi-registry/pom.xml
@@ -48,28 +48,6 @@
<jgit.version>5.13.0.202109080827-r</jgit.version>
</properties>
- <pluginRepositories>
- <pluginRepository>
- <id>jcenter-releases</id>
- <name>jcenter</name>
- <url>https://jcenter.bintray.com</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </pluginRepository>
- <pluginRepository>
- <id>bintray</id>
- <name>Groovy Bintray</name>
- <url>https://dl.bintray.com/groovy/maven</url>
- <releases>
- <updatePolicy>never</updatePolicy>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </pluginRepository>
- </pluginRepositories>
-
<dependencyManagement>
<dependencies>
<!-- Logging dependencies that will be directly in lib -->
diff --git a/pom.xml b/pom.xml
index 39ff28f..10e1530 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,16 +149,6 @@
</snapshots>
</repository>
<repository>
- <id>jcenter</id>
- <url>https://jcenter.bintray.com</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- </releases>
- </repository>
- <repository>
<id>Shibboleth</id>
<name>Shibboleth</name>
<url>https://build.shibboleth.net/nexus/content/repositories/releases/</url>