Repository: nifi Updated Branches: refs/heads/master ccedc71c8 -> de1ad3eb6
NIFI-4198 *ElasticsearchHttp processors do not expose Proxy settings This closes #2094. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01a01a3d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01a01a3d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01a01a3d Branch: refs/heads/master Commit: 01a01a3dfc1769988be82b43f1eb8c9340e2e159 Parents: ccedc71 Author: Arun Manivannan <[email protected]> Authored: Thu Aug 17 01:30:05 2017 +0800 Committer: Koji Kawamura <[email protected]> Committed: Wed May 16 14:01:19 2018 +0900 ---------------------------------------------------------------------- .../AbstractElasticsearchHttpProcessor.java | 55 ++++++++++++++++++ .../elasticsearch/FetchElasticsearchHttp.java | 4 +- .../elasticsearch/PutElasticsearchHttp.java | 4 +- .../elasticsearch/QueryElasticsearchHttp.java | 4 +- .../elasticsearch/ScrollElasticsearchHttp.java | 4 +- .../TestFetchElasticsearchHttp.java | 52 +++++++++++++++++ .../elasticsearch/TestPutElasticsearchHttp.java | 54 ++++++++++++++++++ .../TestQueryElasticsearchHttp.java | 60 ++++++++++++++++++++ 8 files changed, 233 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index 2ed6f79..83f4560 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -18,11 +18,13 @@ package org.apache.nifi.processors.elasticsearch; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.Authenticator; import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import okhttp3.Route; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -42,6 +44,7 @@ import java.net.Proxy; import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -82,6 +85,22 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder() + .name("proxy-username") + .displayName("Proxy Username") + .description("Proxy Username") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder() + .name("proxy-password") + .displayName("Proxy Password") + .description("Proxy Password") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); + public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() .name("elasticsearch-http-connect-timeout") .displayName("Connection Timeout") @@ -115,6 +134,27 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .build(); } + private static final List<PropertyDescriptor> propertyDescriptors; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(ES_URL); + properties.add(PROXY_HOST); + properties.add(PROXY_PORT); + properties.add(PROXY_USERNAME); + properties.add(PROXY_PASSWORD); + properties.add(RESPONSE_TIMEOUT); + + propertyDescriptors = Collections.unmodifiableList(properties); + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { okHttpClientAtomicReference.set(null); @@ -129,6 +169,21 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic okHttpClient.proxy(proxy); } + final String proxyUsername = context.getProperty(PROXY_USERNAME).getValue(); + final String proxyPassword = context.getProperty(PROXY_PASSWORD).getValue(); + + if (proxyUsername != null && proxyPassword != null){ + okHttpClient.proxyAuthenticator(new Authenticator() { + @Override + public Request authenticate(Route route, Response response) throws IOException { + final String credential=Credentials.basic(proxyUsername, proxyPassword); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + } + }); + } + // Set timeouts okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index 2c692df..e069c72 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -169,7 +169,9 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index caf1e01..cd1092a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -174,7 +174,9 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index 15ac65d..b1f860f 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -257,7 +257,9 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index f6f351a..f08e33c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -207,7 +207,9 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index bef54d4..22065af 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -445,4 +445,56 @@ public class TestFetchElasticsearchHttp { runner.run(100); runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100); } + + @Test + @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") + public void testFetchElasticsearchBasicBehindProxy() { + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setProperty(FetchElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(FetchElasticsearchHttp.PROXY_PORT, "3228"); + runner.setProperty(FetchElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.") + public void testFetchElasticsearchBasicBehindAuthenticatedProxy() { + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setProperty(FetchElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(FetchElasticsearchHttp.PROXY_PORT, "3328"); + runner.setProperty(FetchElasticsearchHttp.PROXY_USERNAME, "squid"); + runner.setProperty(FetchElasticsearchHttp.PROXY_PASSWORD, "changeme"); + runner.setProperty(FetchElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 0d36edd..c3099a3 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -499,4 +499,58 @@ public class TestPutElasticsearchHttp { runner.run(); runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 100); } + + @Test + @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") + public void testPutElasticSearchBasicBehindProxy() { + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.setProperty(PutElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(PutElasticsearchHttp.PROXY_PORT, "3228"); + runner.setProperty(PutElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.") + public void testPutElasticSearchBasicBehindAuthenticatedProxy() { + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.setProperty(PutElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(PutElasticsearchHttp.PROXY_PORT, "3328"); + runner.setProperty(PutElasticsearchHttp.PROXY_USERNAME, "squid"); + runner.setProperty(PutElasticsearchHttp.PROXY_PASSWORD, "changeme"); + runner.setProperty(PutElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/01a01a3d/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index 2863264..50b950f 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -37,6 +37,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; +import org.junit.Ignore; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -370,6 +371,65 @@ public class TestQueryElasticsearchHttp { runner.run(1, true, true); } + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Test + @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") + public void testQueryElasticsearchBasicBehindProxy() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new QueryElasticsearchHttp()); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location"); + + runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228"); + runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.enqueue("".getBytes(), new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.") + public void testQueryElasticsearchBasicBehindAuthenticatedProxy() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new QueryElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location"); + + runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3328"); + runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid"); + runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme"); + runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.enqueue("".getBytes(), new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 1); + } + @Test public void testQueryElasticsearchOnTrigger_withQueryParameters() throws IOException { QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
