thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r732938960
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -157,90 +165,71 @@ public void testRunBatching() throws IOException {
@Test
public void testRunMutualTls() throws IOException, TlsException,
InitializationException {
+
+
final String serviceIdentifier = SSLContextService.class.getName();
-
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+ when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
final SSLContext sslContext =
SslContextUtils.createKeyStoreSslContext();
- Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+ when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(serviceIdentifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+ runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
final int syslogFrames = 3;
final List<RELPFrame> frames = getFrames(syslogFrames);
run(frames, syslogFrames, syslogFrames, sslContext);
}
- @Test
- public void testRunNoEventsAvailable() {
- MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
- runner = TestRunners.newTestRunner(mockListenRELP);
- runner.setProperty(ListenRELP.PORT,
Integer.toString(NetworkUtils.availablePort()));
-
- runner.run();
- runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
- runner.shutdown();
- }
-
@Test
public void testBatchingWithDifferentSenders() {
- final String sender1 = "sender1";
- final String sender2 = "sender2";
-
- final List<RELPEvent> mockEvents = new ArrayList<>();
- mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(),
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
- mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(),
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
- mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(),
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
- mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(),
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+ String sender1 = "/192.168.1.50:55000";
+ String sender2 = "/192.168.1.50:55001";
+ String sender3 = "/192.168.1.50:55002";
+
+ final List<RELPMessage> mockEvents = new ArrayList<>();
+ mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(),
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+ mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(),
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+ mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(),
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+ mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(),
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+ mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(),
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+ mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(),
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP);
- runner.setProperty(ListenRELP.PORT,
Integer.toString(NetworkUtils.availablePort()));
- runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+ runner.setProperty(AbstractListenEventBatchingProcessor.PORT,
Integer.toString(NetworkUtils.availablePort()));
+ runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
runner.run();
- runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+ runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
runner.shutdown();
}
private void run(final List<RELPFrame> frames, final int flowFiles, final
int responses, final SSLContext sslContext)
throws IOException {
final int port = NetworkUtils.availablePort();
- runner.setProperty(ListenRELP.PORT, Integer.toString(port));
+ runner.setProperty(AbstractListenEventBatchingProcessor.PORT,
Integer.toString(port));
// Run Processor and start Dispatcher without shutting down
runner.run(1, false, true);
-
- try (final Socket socket = getSocket(port, sslContext)) {
- final OutputStream outputStream = socket.getOutputStream();
- sendFrames(frames, outputStream);
-
- // Run Processor for number of responses
- runner.run(responses, false, false);
-
- runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
- } finally {
- runner.shutdown();
- }
+ final byte[] relpMessages = getRELPMessages(frames);
+ sendMessages(port, relpMessages, sslContext);
+ runner.run(flowFiles, false, false);
Review comment:
So I could be doing something wrong, but the reason is to initialize the
RELP server. Then we send the messages to that server, and then run the runner
again to process the messages received. I believe this is a little bit
different basically because the netty server is running in a separate thread
outside of the typical NiFi trigger method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]