exceptionfactory commented on a change in pull request #4714:
URL: https://github.com/apache/nifi/pull/4714#discussion_r538744071



##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with 
Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, 
SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new 
PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new 
ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = 
context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && 
context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));

Review comment:
       The Splunk Service object has a setSSLSocketFactory method that would 
allow leveraging the NiFi StandardRestrictedSSLContextService instead of 
providing custom SSL properties.  Did you evaluate using that method instead of 
providing this Security Protocol property?

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with 
Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, 
SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new 
PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new 
ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = 
context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && 
context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs 
splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (splunkService != null) {
+            splunkService.logout();
+            splunkService = null;
+        }
+
+        requestChannel = null;
+        ackIdAttributeName = null;
+        insertedAtAttributeName = null;
+        splunkServiceArguments = null;
+    }
+
+    protected ResponseMessage call(final String endpoint, final RequestMessage 
request)  {
+        request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
+
+        try {
+            return splunkService.send(endpoint, request);
+            //Catch Stale connection exception, reinitialize, and retry
+        } catch (final com.splunk.HttpException e) {

Review comment:
       Is there a reason for using the qualified class as opposed to importing 
it?

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestQuerySplunkIndexingStatus {
+    private static final String EVENT = 
"{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}";
+
+    @Mock
+    private Service service;
+
+    @Mock
+    private ResponseMessage response;
+
+    private MockedQuerySplunkIndexingStatus processor;
+    private TestRunner testRunner;
+
+    private ArgumentCaptor<String> path;
+    private ArgumentCaptor<RequestMessage> request;
+
+    @Before
+    public void setUp() {
+        processor = new MockedQuerySplunkIndexingStatus(service);
+        testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(SplunkAPICall.SCHEME, "http");
+        testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 
888c5a81-8777-49a0-a3af-f76e050ab5d9");
+        testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, 
"22bd7414-0d77-4c73-936d-c8f5d1b21862");
+
+        path = ArgumentCaptor.forClass(String.class);
+        request = ArgumentCaptor.forClass(RequestMessage.class);
+        Mockito.when(service.send(path.capture(), 
request.capture())).thenReturn(response);
+    }
+
+    @After
+    public void tearDown() {
+        testRunner.shutdown();
+    }
+
+    @Test
+    public void testHappyPath() throws Exception {

Review comment:
       Recommend renaming to testRunSuccess()

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());

Review comment:
       Recommend eliminating TLSv1 and SSLv3 as options since they are 
generally considered to vulnerable protocols.  SSLv3 is also disabled on most 
recent Java installations, so it may not work even if selected.  As mentioned 
in a separate comment, refactoring to leverage the NiFi SSLContextService would 
delegate the protocol selection to that service as opposed to introducing 
options specific to this Processor.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor

Review comment:
       Although the Scheme, Hostname, and Port properties mirror the options 
for the Splunk Service, did you consider providing the NiFi property definition 
as a URL?  Providing the NiFi property as a URL would still allow for 
retrieving the scheme, hostname, and port elements for instantiation, and could 
also simplify the NiFi Processor configuration.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with 
Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, 
SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new 
PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new 
ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = 
context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && 
context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs 
splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (splunkService != null) {
+            splunkService.logout();
+            splunkService = null;
+        }
+
+        requestChannel = null;
+        ackIdAttributeName = null;
+        insertedAtAttributeName = null;
+        splunkServiceArguments = null;
+    }
+
+    protected ResponseMessage call(final String endpoint, final RequestMessage 
request)  {
+        request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
+
+        try {
+            return splunkService.send(endpoint, request);
+            //Catch Stale connection exception, reinitialize, and retry
+        } catch (final com.splunk.HttpException e) {
+            getLogger().error("Splunk request status code:" + e.getStatus() + 
" Retrying the request.");

Review comment:
       Recommend adjusting the logging statement to use parameters:
   ```suggestion
               getLogger().error("Splunk failed with HTTP {}: Retrying the 
request", new Object[]{ e.getStatus() });
   ```

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with 
Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, 
SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new 
PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to 
store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new 
ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = 
context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = 
context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && 
context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs 
splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (splunkService != null) {
+            splunkService.logout();
+            splunkService = null;
+        }
+
+        requestChannel = null;
+        ackIdAttributeName = null;
+        insertedAtAttributeName = null;
+        splunkServiceArguments = null;
+    }
+
+    protected ResponseMessage call(final String endpoint, final RequestMessage 
request)  {
+        request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
+
+        try {
+            return splunkService.send(endpoint, request);
+            //Catch Stale connection exception, reinitialize, and retry
+        } catch (final com.splunk.HttpException e) {
+            getLogger().error("Splunk request status code:" + e.getStatus() + 
" Retrying the request.");
+            splunkService.logout();
+            splunkService = getSplunkService(splunkServiceArguments);
+            return splunkService.send(endpoint, request);
+        }
+    }
+
+    protected <T> T extractResult(final InputStream responseBody, final 
Class<T> type) throws IOException {
+        final JsonParser jsonParser = jsonFactory.createParser(responseBody);
+        jsonParser.setCodec(jsonObjectMapper);
+        return jsonParser.readValueAs(type);

Review comment:
       Is there a reason for not using 
`jsonObjectMapper.readValue(responseBody, type)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to