Repository: nifi Updated Branches: refs/heads/master c360b57a1 -> ef192cc85
NIFI-1751 Added proxy authentication in InvokeHttp processor This closes #398. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef192cc8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef192cc8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef192cc8 Branch: refs/heads/master Commit: ef192cc859c30ba0022c0124c4bac22c5930e82f Parents: c360b57 Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Sun May 1 22:09:27 2016 +0200 Committer: Aldrin Piri <ald...@apache.org> Committed: Tue Jun 21 14:09:02 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/InvokeHTTP.java | 68 +++++++++++++++---- .../standard/util/MultiAuthenticator.java | 69 ++++++++++++++++++++ .../processors/standard/TestInvokeHTTP.java | 10 +++ 3 files changed, 136 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ef192cc8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 8d4d1e5..6c3e32e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -49,17 +49,15 @@ import javax.net.ssl.SSLSession; import com.burgstaller.okhttp.AuthenticationCacheInterceptor; import com.burgstaller.okhttp.CachingAuthenticatorDecorator; -import com.burgstaller.okhttp.DispatchingAuthenticator; import com.burgstaller.okhttp.digest.CachingAuthenticator; import com.burgstaller.okhttp.digest.DigestAuthenticator; - import com.squareup.okhttp.MediaType; import com.squareup.okhttp.OkHttpClient; import com.squareup.okhttp.Request; import com.squareup.okhttp.RequestBody; import com.squareup.okhttp.Response; - import com.squareup.okhttp.ResponseBody; + import okio.BufferedSink; import org.apache.commons.io.input.TeeInputStream; import org.apache.commons.lang3.StringUtils; @@ -85,6 +83,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.MultiAuthenticator; import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; @@ -214,6 +213,23 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor PROP_PROXY_USER = new PropertyDescriptor.Builder() + .name("invokehttp-proxy-user") + .displayName("Proxy Username") + .description("Username to set when authenticating against proxy") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_PROXY_PASSWORD = new PropertyDescriptor.Builder() + .name("invokehttp-proxy-password") + .displayName("Proxy Password") + .description("Password to set when authenticating against proxy") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder() .name("Content-Type") .description("The Content-Type to specify for when content is being transmitted through a PUT or POST. " @@ -349,6 +365,8 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_BASIC_AUTH_PASSWORD, PROP_PROXY_HOST, PROP_PROXY_PORT, + PROP_PROXY_USER, + PROP_PROXY_PASSWORD, PROP_PUT_OUTPUT_IN_ATTRIBUTE, PROP_PUT_ATTRIBUTE_MAX_LENGTH, PROP_DIGEST_AUTH, @@ -454,7 +472,7 @@ public final class InvokeHTTP extends AbstractProcessor { @Override protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(1); + final List<ValidationResult> results = new ArrayList<>(3); final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet(); final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet(); @@ -462,6 +480,16 @@ public final class InvokeHTTP extends AbstractProcessor { results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); } + final boolean proxyUserSet = validationContext.getProperty(PROP_PROXY_USER).isSet(); + final boolean proxyPwdSet = validationContext.getProperty(PROP_PROXY_PASSWORD).isSet(); + + if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) { + results.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build()); + } + if(proxyUserSet && !proxyHostSet) { + results.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy username is set, proxy host must be set").build()); + } + return results; } @@ -500,7 +528,16 @@ public final class InvokeHTTP extends AbstractProcessor { okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier())); } + setAuthenticator(okHttpClient, context); + + useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean(); + + okHttpClientAtomicReference.set(okHttpClient); + } + + private void setAuthenticator(OkHttpClient okHttpClient, ProcessContext context) { final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); + final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).getValue()); // If the username/password properties are set then check if digest auth is being used if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { @@ -512,22 +549,31 @@ public final class InvokeHTTP extends AbstractProcessor { * Once added this should be refactored to use the built in support. For now, a third party lib is needed. */ final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>(); - com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass); - final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials); - DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder() + MultiAuthenticator authenticator = new MultiAuthenticator.Builder() .with("Digest", digestAuthenticator) .build(); + if(!proxyUsername.isEmpty()) { + final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).getValue(); + authenticator.setProxyUsername(proxyUsername); + authenticator.setProxyPassword(proxyPassword); + } + okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache)); okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache)); + } else { + // Add proxy authentication only + if(!proxyUsername.isEmpty()) { + final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).getValue(); + MultiAuthenticator authenticator = new MultiAuthenticator.Builder().build(); + authenticator.setProxyUsername(proxyUsername); + authenticator.setProxyPassword(proxyPassword); + okHttpClient.setAuthenticator(authenticator); + } } - - useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean(); - - okHttpClientAtomicReference.set(okHttpClient); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/ef192cc8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/MultiAuthenticator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/MultiAuthenticator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/MultiAuthenticator.java new file mode 100644 index 0000000..62075d2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/MultiAuthenticator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.IOException; +import java.net.Proxy; +import java.util.HashMap; +import java.util.Map; + +import com.burgstaller.okhttp.DispatchingAuthenticator; +import com.squareup.okhttp.Authenticator; +import com.squareup.okhttp.Credentials; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.Response; + +public class MultiAuthenticator extends DispatchingAuthenticator { + + public MultiAuthenticator(Map<String, Authenticator> registry) { + super(registry); + } + + private String proxyUsername; + private String proxyPassword; + + @Override + public Request authenticateProxy(Proxy proxy, Response response) throws IOException { + String credential = Credentials.basic(proxyUsername, proxyPassword); + return response.request() + .newBuilder() + .header("Proxy-Authorization", credential) + .build(); + } + + public void setProxyUsername(String proxyUsername) { + this.proxyUsername = proxyUsername; + } + + public void setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + } + + public static final class Builder { + Map<String, Authenticator> registry = new HashMap<>(); + + public Builder with(String scheme, Authenticator authenticator) { + registry.put(scheme, authenticator); + return this; + } + + public MultiAuthenticator build() { + return new MultiAuthenticator(registry); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ef192cc8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index b99f4b8..84292cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@ -144,6 +144,16 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon { } runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort())); + runner.setProperty(InvokeHTTP.PROP_PROXY_USER, "username"); + + try{ + runner.run(); + Assert.fail(); + } catch (AssertionError e){ + // Expect assetion error when proxy password isn't set but host is. + } + runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, "password"); + createFlowFiles(runner); runner.run();