[
https://issues.apache.org/jira/browse/NIFI-1976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15334992#comment-15334992
]
ASF GitHub Bot commented on NIFI-1976:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/525#discussion_r67443126
--- Diff:
nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/test/java/org/apache/nifi/processors/windows/event/log/EvtSubscribeTest.java
---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.windows.event.log;
+
+import com.google.common.net.MediaType;
+import com.sun.jna.Pointer;
+import com.sun.jna.platform.win32.Kernel32;
+import com.sun.jna.platform.win32.W32Errors;
+import com.sun.jna.platform.win32.WinDef;
+import com.sun.jna.platform.win32.WinNT;
+import org.apache.commons.io.Charsets;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import
org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback;
+import org.apache.nifi.processors.windows.event.log.jna.WEvtApi;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(JNAJUnitRunner.class)
+public class EvtSubscribeTest {
+ @Mock
+ Kernel32 kernel32;
+
+ @Mock
+ WEvtApi wEvtApi;
+
+ @Mock
+ ProcessContext processContext;
+
+ @Mock
+ ProcessSessionFactory processSessionFactory;
+
+ @Mock
+ ProcessSession processSession;
+
+ private EvtSubscribe evtSubscribe;
+
+ @Before
+ public void setup() {
+
when(processSessionFactory.createSession()).thenReturn(processSession);
+ evtSubscribe = new EvtSubscribe(wEvtApi, kernel32);
+ }
+
+ @Test
+ public void testFlow() throws Exception {
+ int maxEventQueue = 1;
+ int maxBuffer = 1024;
+ String testChannel = "testChannel";
+ String testQuery = "testQuery";
+ String testXml3 = "TestXml3";
+ String testXml4 = "TestXml4";
+
+ FlowFile flowFile1 = mock(FlowFile.class);
+ FlowFile flowFile2 = mock(FlowFile.class);
+ FlowFile flowFile3 = mock(FlowFile.class);
+
+ FlowFile flowFile4 = mock(FlowFile.class);
+ FlowFile flowFile5 = mock(FlowFile.class);
+ FlowFile flowFile6 = mock(FlowFile.class);
+
+ ByteArrayOutputStream byteArrayOutputStream1 = new
ByteArrayOutputStream();
+ ByteArrayOutputStream byteArrayOutputStream2 = new
ByteArrayOutputStream();
+
+
when(processSession.create()).thenReturn(flowFile1).thenReturn(flowFile4).thenReturn(null);
+ when(processSession.write(eq(flowFile1),
isA(OutputStreamCallback.class))).thenAnswer(invocation -> {
+ ((OutputStreamCallback)
invocation.getArguments()[1]).process(byteArrayOutputStream1);
+ return flowFile2;
+ });
+ when(processSession.write(eq(flowFile4),
isA(OutputStreamCallback.class))).thenAnswer(invocation -> {
+ ((OutputStreamCallback)
invocation.getArguments()[1]).process(byteArrayOutputStream2);
+ return flowFile5;
+ });
+
+ AtomicReference<String> mimeType1 = new AtomicReference<>(null);
+ AtomicReference<String> mimeType2 = new AtomicReference<>(null);
+ when(processSession.putAttribute(eq(flowFile2),
eq(CoreAttributes.MIME_TYPE.key()), anyString())).thenAnswer(invocation -> {
+ mimeType1.set((String) invocation.getArguments()[2]);
+ return flowFile3;
+ });
+ when(processSession.putAttribute(eq(flowFile5),
eq(CoreAttributes.MIME_TYPE.key()), anyString())).thenAnswer(invocation -> {
+ mimeType2.set((String) invocation.getArguments()[2]);
+ return flowFile6;
+ });
+
+ PropertyValue maxEventSize = mock(PropertyValue.class);
+ when(maxEventSize.asInteger()).thenReturn(maxEventQueue);
+
+ PropertyValue maxBufferSize = mock(PropertyValue.class);
+ when(maxBufferSize.asInteger()).thenReturn(maxBuffer);
+
+ PropertyValue channel = mock(PropertyValue.class);
+ when(channel.getValue()).thenReturn(testChannel);
+
+ PropertyValue query = mock(PropertyValue.class);
+ when(query.getValue()).thenReturn(testQuery);
+
+ WinNT.HANDLE subscriptionHandle = mock(WinNT.HANDLE.class);
+ when(wEvtApi.EvtSubscribe(isNull(WinNT.HANDLE.class),
isNull(WinNT.HANDLE.class), eq(testChannel), eq(testQuery),
+ isNull(WinNT.HANDLE.class), isNull(WinDef.PVOID.class),
isA(EventSubscribeXmlRenderingCallback.class),
+ eq(WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE)))
+ .thenReturn(subscriptionHandle);
+
+
when(processContext.getProperty(EvtSubscribe.MAX_EVENT_QUEUE_SIZE)).thenReturn(maxEventSize);
+
when(processContext.getProperty(EvtSubscribe.MAX_BUFFER_SIZE)).thenReturn(maxBufferSize);
+
when(processContext.getProperty(EvtSubscribe.CHANNEL)).thenReturn(channel);
+
when(processContext.getProperty(EvtSubscribe.QUERY)).thenReturn(query);
+
+ evtSubscribe.subscribeToEvents(processContext);
+ ArgumentCaptor<EventSubscribeXmlRenderingCallback>
callbackArgumentCaptor =
ArgumentCaptor.forClass(EventSubscribeXmlRenderingCallback.class);
+ verify(wEvtApi).EvtSubscribe(isNull(WinNT.HANDLE.class),
isNull(WinNT.HANDLE.class), eq(testChannel), eq(testQuery),
+ isNull(WinNT.HANDLE.class), isNull(WinDef.PVOID.class),
callbackArgumentCaptor.capture(),
+ eq(WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE));
+
+ EventSubscribeXmlRenderingCallback callback =
callbackArgumentCaptor.getValue();
+ Consumer<String> consumer = callback.getConsumer();
+ consumer.accept("TestXml1");
+ consumer.accept("TestXml2");
+ consumer.accept(testXml3);
+
+ evtSubscribe.onTrigger(processContext, processSessionFactory);
+ verify(processSession).transfer(flowFile3,
EvtSubscribe.REL_SUCCESS);
+ verify(processSession, times(1)).transfer(any(FlowFile.class),
eq(EvtSubscribe.REL_SUCCESS));
+ verify(processContext).yield();
+
+ assertEquals(processSessionFactory,
evtSubscribe.getProcessSessionFactory());
+ assertEquals(MediaType.APPLICATION_XML_UTF_8.toString(),
mimeType1.get());
+ assertEquals(testXml3,
Charsets.UTF_8.decode(ByteBuffer.wrap(byteArrayOutputStream1.toByteArray())).toString());
+
+ consumer.accept(testXml4);
+ assertEquals(MediaType.APPLICATION_XML_UTF_8.toString(),
mimeType2.get());
+ assertEquals(testXml4,
Charsets.UTF_8.decode(ByteBuffer.wrap(byteArrayOutputStream2.toByteArray())).toString());
+
+ verify(processSession).transfer(flowFile6,
EvtSubscribe.REL_SUCCESS);
+ verify(processSession, times(2)).transfer(any(FlowFile.class),
eq(EvtSubscribe.REL_SUCCESS));
+
+ evtSubscribe.closeSubscriptionHandle();
+ verify(kernel32).CloseHandle(subscriptionHandle);
+ }
+
+ @Test
+ public void testLifecycle() throws InvocationTargetException,
IllegalAccessException {
+ WinNT.HANDLE subscriptionHandle = mock(WinNT.HANDLE.class);
+ when(wEvtApi.EvtSubscribe(isNull(WinNT.HANDLE.class),
isNull(WinNT.HANDLE.class), eq(EvtSubscribe.DEFAULT_CHANNEL),
eq(EvtSubscribe.DEFAULT_XPATH),
+ isNull(WinNT.HANDLE.class), isNull(WinDef.PVOID.class),
isA(EventSubscribeXmlRenderingCallback.class),
+ eq(WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE)))
+ .thenReturn(subscriptionHandle);
+
+ TestRunner testRunner = TestRunners.newTestRunner(evtSubscribe);
+
+ // Want to test things that can happen after schedule but before
trigger
+ ProcessContext context = testRunner.getProcessContext();
+ ((MockProcessContext) context).assertValid();
+ ((MockProcessContext) context).enableExpressionValidation();
+ ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class,
evtSubscribe, context);
+
+ ArgumentCaptor<EventSubscribeXmlRenderingCallback>
callbackArgumentCaptor =
ArgumentCaptor.forClass(EventSubscribeXmlRenderingCallback.class);
+ verify(wEvtApi).EvtSubscribe(isNull(WinNT.HANDLE.class),
isNull(WinNT.HANDLE.class), eq(EvtSubscribe.DEFAULT_CHANNEL),
eq(EvtSubscribe.DEFAULT_XPATH),
+ isNull(WinNT.HANDLE.class), isNull(WinDef.PVOID.class),
callbackArgumentCaptor.capture(),
+ eq(WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE));
+
+ EventSubscribeXmlRenderingCallback renderingCallback =
callbackArgumentCaptor.getValue();
+ AtomicInteger bufferSize = new
AtomicInteger(EventSubscribeXmlRenderingCallback.INITIAL_BUFFER_SIZE - 512);
+ for (int i = 0; i < 1025; i++) {
--- End diff --
What is this loop testing?
> JNA-Based Event Log Subscription Processor
> ------------------------------------------
>
> Key: NIFI-1976
> URL: https://issues.apache.org/jira/browse/NIFI-1976
> Project: Apache NiFi
> Issue Type: Sub-task
> Reporter: Bryan Rosander
>
> Using JNA, we should be able to leverage existing Windows APIs to natively
> consume events as they happen. Will look into subscribing to events
> (https://msdn.microsoft.com/en-us/library/windows/desktop/aa385771(v=vs.85).aspx)
> in order to reduce latency.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)