[
https://issues.apache.org/jira/browse/NIFI-3536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066518#comment-16066518
]
ASF GitHub Bot commented on NIFI-3536:
--------------------------------------
Github user apsaltis commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1824#discussion_r124549792
--- Diff:
nifi-nar-bundles/nifi-soap-bundle/nifi-soap-processors/src/main/java/org/apache/nifi/processors/soap/GetSOAP.java
---
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.soap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.transport.http.HTTPConstants;
+import
org.apache.axis2.transport.http.impl.httpclient3.HttpTransportPropertiesImpl;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"SOAP", "Get", "Ingest", "Ingress"})
+@CapabilityDescription(
+ "Execute provided request against the SOAP endpoint. The result
will be left in it's orginal form and all " +
+ "headers that are returned will be written as attributes to the
resulting flow file.")
+@WritesAttributes({@WritesAttribute(attribute = "getsoap.status.code",
description = "The status code that is returned"),
+ @WritesAttribute(attribute = "getsoap.status.message", description
= "The status message that is returned"),
+ @WritesAttribute(attribute = "getsoap.request.wsdl_url",
description = "The request WSDL URL"),
+ @WritesAttribute(attribute = "getsoap.request.method", description
= "The request method"),
+ @WritesAttribute(attribute = "mime.type", description = "Sets mime
type to text/xml")})
+@DynamicProperty(name = "The name of a input parameter the needs to be
passed to the SOAP method being invoked.",
+ value = "The value for this parameter '=' and ',' are not
considered valid values and must be escaped . " +
+ "Note, if the value of parameter needs to be an array the
format should be key1=value1,key2=value2. ",
+ description = "The name provided will be the name sent in the SOAP
method, therefore please make sure " +
+ "it matches the wsdl documentation for the SOAP
service being called. In the case of arrays " +
+ "the name will be the name of the array and the
key's specified in the value will be the element " +
+ "names passed.")
+
+public class GetSOAP extends AbstractProcessor {
+
+ protected static final PropertyDescriptor ENDPOINT_URL = new
PropertyDescriptor
+ .Builder()
+ .name("endpoint-url")
+ .displayName("Endpoint URL")
+ .description("The endpoint url that hosts the web service(s)
that should be called.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor WSDL_URL = new
PropertyDescriptor
+ .Builder()
+ .name("wsdl-url")
+ .displayName("WSDL URL")
+ .description("The url where the wsdl file can be retrieved and
referenced.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor METHOD_NAME = new
PropertyDescriptor
+ .Builder()
+ .name("soap-method-name")
+ .displayName("SOAP Method Name")
+ .description("The method exposed by the SOAP webservice that
should be invoked.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor USER_NAME = new
PropertyDescriptor
+ .Builder()
+ .name("username")
+ .displayName("Username")
+ .sensitive(true)
+ .description("The username to use in the case of basic Auth")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor PASSWORD = new
PropertyDescriptor
+ .Builder()
+ .name("password")
+ .displayName("Password")
+ .sensitive(true)
+ .description("The password to use in the case of basic Auth")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor USER_AGENT = new
PropertyDescriptor
+ .Builder()
+ .name("user-agent")
+ .displayName("User Agent")
+ .defaultValue("NiFi SOAP Processor")
+ .description("The user agent string to use, the default is
Nifi SOAP Processor")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor SO_TIMEOUT = new
PropertyDescriptor
+ .Builder()
+ .name("socket-timeout")
+ .displayName("Socket Timeout")
+ .defaultValue("60000")
+ .description("The timeout value to use waiting for data from
the webservice")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor CONNECTION_TIMEOUT = new
PropertyDescriptor
+ .Builder()
+ .name("connection-timeout")
+ .displayName("Connection Timeout")
+ .defaultValue("60000")
+ .description("The timeout value to use waiting to establish a
connection to the web service")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor AUTH_BY_HEADER = new
PropertyDescriptor.Builder()
+ .name("header-authentication")
+ .displayName("Header Authentication")
+ .description("If you need to do authentication with SOAP
headers, this must be a json string " +
+ "defining the structure since it varies from
service to service. Username and Password " +
+ "must be set in the separate parameters and
referenced here " +
+ "as variables ${Username} and ${Password}. For
example, " +
+ "{ \"UserAuthentication\": { \"username\":
\"${Username}\", \"password\": \"${Password}\"," +
+ " \"some_other_custom_field\": \"value\" }")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String
value, ValidationContext context) {
+ try {
+ return (new ValidationResult.Builder())
+ .subject(subject)
+ .input(value)
+ .explanation("Header Authentication must
be empty or valid json")
+ .valid("".equals(value) ||
mapper.readTree(value) != null).build();
+ } catch (IOException e) {
+ return (new ValidationResult.Builder())
+ .subject(subject)
+ .input(value)
+ .explanation("Header Authentication must
be empty or valid json: " + e.getMessage())
+ .valid(false).build();
+ }
+ }
+ })
+ .build();
+
+ protected static final PropertyDescriptor API_NAMESPACE = new
PropertyDescriptor.Builder()
+ .name("namespace")
+ .displayName("Namespace")
+ .description("XML Namespace.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamic(false)
+ .build();
+
+ protected static final PropertyDescriptor SKIP_FIRST_ELEMENT = new
PropertyDescriptor.Builder()
+ .name("skip-first-element")
+ .displayName("Skip First Element")
+ .description("Return the XML that comes after the first
element.")
+ .required(true)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .dynamic(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are created are routed to
this relationship")
+ .build();
+
+ private List<PropertyDescriptor> descriptors;
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ENDPOINT_URL);
+ descriptors.add(WSDL_URL);
+ descriptors.add(METHOD_NAME);
+ descriptors.add(USER_NAME);
+ descriptors.add(PASSWORD);
+ descriptors.add(USER_AGENT);
+ descriptors.add(SO_TIMEOUT);
+ descriptors.add(CONNECTION_TIMEOUT);
+ descriptors.add(AUTH_BY_HEADER);
+ descriptors.add(API_NAMESPACE);
+ descriptors.add(SKIP_FIRST_ELEMENT);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>(1);
+ relationships.add(REL_SUCCESS);
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .description("Specifies the method name and parameter
names and values for '" +
+ propertyDescriptorName +
+ "' the SOAP method being called.")
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .build();
+ }
+
+ @Override
+ protected final Collection<ValidationResult> customValidate(final
ValidationContext context) {
+ final List<ValidationResult> problems = new
ArrayList<>(super.customValidate(context));
+
+
+ if ((!context.getProperty(USER_NAME).isSet() &&
!context.getProperty(PASSWORD).isSet())
+ && context.getProperty(AUTH_BY_HEADER).isSet()) {
+ problems.add(
+ new ValidationResult.Builder()
+ .subject(AUTH_BY_HEADER.getName())
+
.input(context.getProperty(AUTH_BY_HEADER).getValue())
+ .valid(false)
+ .explanation("To use " +
AUTH_BY_HEADER.getDisplayName() + " you must also set " +
+ USER_NAME.getDisplayName() + "
and " + PASSWORD.getDisplayName())
+ .build()
+ );
+ }
+
+ return problems;
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final ComponentLog logger = getLogger();
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ flowFile = session.create();
+ }
+
+ final Options options = getOptions(context);
+ final OMFactory fac = OMAbstractFactory.getOMFactory();
+ final String apiNamespace =
context.getProperty(API_NAMESPACE).getValue();
+ final OMNamespace omNamespace =
fac.createOMNamespace(apiNamespace, "ns1");
+
+
+ final OMElement authHeader = getAuthHeader(context, fac,
omNamespace);
+
+ ServiceClient serviceClient;
+ try {
+ serviceClient = new ServiceClient();
+ serviceClient.setOptions(options);
+ } catch (AxisFault axisFault) {
+ getLogger().error(
+ "Failed to create webservice client, please check that
the service endpoint is available and " +
+ "the property is valid.",
+ axisFault);
+ throw new ProcessException(axisFault);
+ }
+ final String methodName = context.getProperty(METHOD_NAME)
+
.evaluateAttributeExpressions(flowFile)
+ .getValue();
+
+ try {
+
+ serviceClient.getOptions().setAction(apiNamespace +
methodName);
+
+ //get the dynamic properties, execute the call and return the
results
+ final OMElement method = getSoapMethod(fac, omNamespace,
methodName);
+
+ //now we need to walk the arguments and add them
+ addArgumentsToMethod(context, fac, omNamespace, method,
flowFile);
+ final OMElement result = executeSoapMethod(serviceClient,
authHeader, method);
+ logger.debug("RESULT" + result);
+ final boolean skipFirstElement =
context.getProperty(SKIP_FIRST_ELEMENT).asBoolean();
+ flowFile = processSoapRequest(session, result, flowFile,
skipFirstElement);
+
+
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final Throwable t) {
+ context.yield();
+ session.rollback();
+ logger.error("Failed to process due to {}; rolling back
session", new Object[]{t.getMessage()}, t);
+ throw t;
+ }
+ }
+
+ private OMElement getAuthHeader(ProcessContext context, OMFactory fac,
OMNamespace omNamespace) {
+ OMElement authHeader = null;
+ if (context.getProperty(AUTH_BY_HEADER).isSet()) {
+
+ Map<String, String> attributes = new HashMap<>();
+ //get the username and password -- they both must be populated
if using basic auth.
+
+ final String userName =
context.getProperty(USER_NAME).getValue();
+ final String password =
context.getProperty(PASSWORD).getValue();
+
+ attributes.put("Username", userName);
+ attributes.put("Password", password);
+ // Evaluate the contents of authByHeader and allow variable
replacement with $Username $Password
+ final String authByHeader = context.getProperty(AUTH_BY_HEADER)
+
.evaluateAttributeExpressions(attributes)
+ .getValue();
+
+
+ if (authByHeader != null && !"".equals(authByHeader)) {
+ Map<String, Object> map = null;
+ try {
+ map = mapper.readValue(authByHeader,
+ new TypeReference<Map<String,
Object>>() {
+ });
+ } catch (IOException e) {
+ getLogger().error(e.getMessage(), e);
+ }
+
+ authHeader = createAuthHeader(fac, omNamespace, null, map);
+ }
+ }
+ return authHeader;
+ }
+
+ private Options getOptions(ProcessContext context) {
+ Options options = new Options();
+
+ final String endpointURL =
context.getProperty(ENDPOINT_URL).getValue();
+ options.setTo(new EndpointReference(endpointURL));
+
+ if (isHTTPS(endpointURL)) {
+ options.setTransportInProtocol(Constants.TRANSPORT_HTTPS);
+ } else {
+ options.setTransportInProtocol(Constants.TRANSPORT_HTTP);
+ }
+
+ options.setCallTransportCleanup(true);
+ options.setProperty(HTTPConstants.CHUNKED, false);
+
+ options.setProperty(HTTPConstants.USER_AGENT,
context.getProperty(USER_AGENT).getValue());
+ options.setProperty(HTTPConstants.SO_TIMEOUT,
context.getProperty(SO_TIMEOUT).asInteger());
+ options.setProperty(HTTPConstants.CONNECTION_TIMEOUT,
context.getProperty(CONNECTION_TIMEOUT).asInteger());
+
+ if((context.getProperty(USER_NAME).isSet() &&
context.getProperty(PASSWORD).isSet())
+ && !context.getProperty(AUTH_BY_HEADER).isSet()){
+ final String userName =
context.getProperty(USER_NAME).getValue();
+ final String password =
context.getProperty(PASSWORD).getValue();
+ HttpTransportPropertiesImpl.Authenticator
+ auth = new HttpTransportPropertiesImpl.Authenticator();
+ auth.setUsername(userName);
+ auth.setPassword(password);
+
options.setProperty(org.apache.axis2.transport.http.HTTPConstants.AUTHENTICATE,
auth);
+ }
+ return options;
+ }
+
+ protected OMElement createAuthHeader(final OMFactory fac, final
OMNamespace omNamespace, final OMElement parent, Map<String, Object> map) {
+ OMElement element = null;
+ if (map != null) {
+ for (String key : map.keySet()) {
+ element = fac.createOMElement(key, omNamespace);
+ Object value = map.get(key);
+ if (value instanceof String) {
+ element.setText(value.toString());
+ if (parent != null) {
+ parent.addChild(element);
+ }
+ } else {
+ OMElement childElement = createAuthHeader(fac,
omNamespace, element, (Map) value);
+ element.addChild(childElement);
+ }
+ }
+ }
+ return element;
+ }
+
+ FlowFile processSoapRequest(ProcessSession session, final OMElement
result, FlowFile flowFile, final boolean skipFirstElement) {
+ flowFile = session.write(flowFile, out -> {
+ try {
+ if (skipFirstElement) {
+ String response = result.getFirstElement().getText();
+ out.write(response.getBytes());
+ } else {
+ out.write(result.toString().getBytes());
+ }
+ } catch (AxisFault axisFault) {
+ final ComponentLog logger = getLogger();
+ if (null != logger) {
+ logger.error("Failed parsing the data that came back
from the web service method", axisFault);
+ }
+ throw new ProcessException(axisFault);
+ }
+ });
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/xml");
+ return session.putAllAttributes(flowFile, attributes);
+ }
+
+ OMElement executeSoapMethod(final ServiceClient serviceClient, final
OMElement authHeader, final OMElement method) {
+ try {
+ if (authHeader != null) {
+ serviceClient.addHeader(authHeader);
+ }
+ getLogger().info("Sending authHeader: " + authHeader);
--- End diff --
Makes total sense.
> Add GetSOAP Processor
> ---------------------
>
> Key: NIFI-3536
> URL: https://issues.apache.org/jira/browse/NIFI-3536
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Andrew Psaltis
> Assignee: Andrew Psaltis
>
> Although it may seem like everyone is now using REST or at least RESTful
> services there still remains a need for many enterprises to interact with
> SOAP endpoints. This JIRA is for a new processor that supports interacting
> with SOAP endpoints.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)