simonbence commented on a change in pull request #4714: URL: https://github.com/apache/nifi/pull/4714#discussion_r539468131
########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import com.splunk.SSLSecurityProtocol; +import com.splunk.Service; +import com.splunk.ServiceArgs; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +abstract class SplunkAPICall extends AbstractProcessor { + private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel"; + + private static final String HTTP_SCHEME = "http"; + private static final String HTTPS_SCHEME = "https"; + + private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name()); + private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name()); + private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name()); + private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name()); + + static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder() + .name("Scheme") + .description("The scheme for connecting to Splunk.") + .allowableValues(HTTPS_SCHEME, HTTP_SCHEME) + .defaultValue(HTTPS_SCHEME) + .required(true) + .build(); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The ip address or hostname of the Splunk server.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("localhost") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The HTTP Port Number of the Splunk server.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("9088") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("Security Protocol") + .description("The security protocol to use for communicating with Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE) + .defaultValue(TLS_1_2_VALUE.getValue()) + .build(); + + static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() + .name("Owner") + .description("The owner to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder() + .name("Token") + .description("The token to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("The username to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); + + static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder() + .name("request-channel") + .displayName("Splunk Request Channel") + .description("Identifier of the used request channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("splunk-ack-id-attribute-name") + .displayName("Splunk Acknowledgement Id Attribute Name") + .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.") + .defaultValue("splunk_acknowledgement_id") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("splunk-sent-at-attribute") + .displayName("Splunk Sent At Attribute Name") + .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.") + .defaultValue("splunk_send_at") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList( + SCHEME, + HOSTNAME, + PORT, + SECURITY_PROTOCOL, + OWNER, + TOKEN, + USERNAME, + PASSWORD, + REQUEST_CHANNEL, + SPLUNK_ACK_ID_ATTRIBUTE, + SPLUNK_SENT_AT_ATTRIBUTE + ); + + protected final JsonFactory jsonFactory = new JsonFactory(); + protected final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory); + + protected volatile ServiceArgs splunkServiceArguments; + protected volatile Service splunkService; + protected volatile String requestChannel; + protected volatile String ackIdAttributeName; + protected volatile String insertedAtAttributeName; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return SplunkAPICall.PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + splunkServiceArguments = getSplunkServiceArgs(context); + splunkService = getSplunkService(splunkServiceArguments); + requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue(); + ackIdAttributeName = context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue(); + insertedAtAttributeName = context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue(); + } + + private ServiceArgs getSplunkServiceArgs(final ProcessContext context) { + final ServiceArgs splunkServiceArguments = new ServiceArgs(); + + splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue()); + splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue()); + splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger()); + + if (context.getProperty(OWNER).isSet()) { + splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(TOKEN).isSet()) { + splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(USERNAME).isSet()) { + splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(PASSWORD).isSet()) { + splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue()); + } + + if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) { + splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue())); + } + + return splunkServiceArguments; + } + + protected Service getSplunkService(final ServiceArgs splunkServiceArguments) { + return Service.connect(splunkServiceArguments); + } + + @OnUnscheduled + public void onUnscheduled() { + if (splunkService != null) { + splunkService.logout(); + splunkService = null; + } + + requestChannel = null; + ackIdAttributeName = null; + insertedAtAttributeName = null; + splunkServiceArguments = null; + } + + protected ResponseMessage call(final String endpoint, final RequestMessage request) { + request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel); + + try { + return splunkService.send(endpoint, request); + //Catch Stale connection exception, reinitialize, and retry + } catch (final com.splunk.HttpException e) { + getLogger().error("Splunk request status code:" + e.getStatus() + " Retrying the request."); + splunkService.logout(); + splunkService = getSplunkService(splunkServiceArguments); + return splunkService.send(endpoint, request); + } + } + + protected <T> T extractResult(final InputStream responseBody, final Class<T> type) throws IOException { + final JsonParser jsonParser = jsonFactory.createParser(responseBody); + jsonParser.setCodec(jsonObjectMapper); + return jsonParser.readValueAs(type); Review comment: Nice improvement, I was not aware of this ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
