This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new f106f8474a NIFI-13543 Backport HttpRecordSink service to NiFi 1.x
f106f8474a is described below
commit f106f8474a16edfd5cb97b41e4abceec1e144ffb
Author: Jim Steinebrey <[email protected]>
AuthorDate: Fri Sep 6 18:32:49 2024 -0400
NIFI-13543 Backport HttpRecordSink service to NiFi 1.x
Signed-off-by: Pierre Villard <[email protected]>
This closes #9243.
---
.../nifi-record-sink-service/pom.xml | 36 +++
.../apache/nifi/record/sink/HttpRecordSink.java | 281 +++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 1 +
.../nifi/record/sink/TestHttpRecordSink.java | 303 +++++++++++++++++++++
4 files changed, 621 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
index e08fbfc553..c447df1beb 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
@@ -59,6 +59,42 @@
<artifactId>nifi-event-transport</artifactId>
<version>1.28.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-client-provider-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-client-provider-service</artifactId>
+ <version>1.28.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-proxy-configuration-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.28.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils-api</artifactId>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/HttpRecordSink.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/HttpRecordSink.java
new file mode 100644
index 0000000000..150580ae41
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/HttpRecordSink.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.sink;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+@Tags({"http", "post", "record", "sink"})
+@CapabilityDescription("Format and send Records to a configured uri using HTTP
post. The Record Writer formats the records which are sent as the body of the
HTTP post request. " +
+ "JsonRecordSetWriter is often used with this processor because many
HTTP posts require a JSON body.")
+public class HttpRecordSink extends AbstractControllerService implements
RecordSinkService {
+ protected static final String HEADER_AUTHORIZATION = "Authorization";
+ protected static final String HEADER_CONTENT_TYPE = "Content-Type";
+
+ public static final PropertyDescriptor API_URL = new
PropertyDescriptor.Builder()
+ .name("API URL")
+ .description("The URL which receives the HTTP requests.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Maximum Batch Size")
+ .description("Specifies the maximum number of records to send in
the body of each HTTP request. Zero means the batch size is not limited, "
+ + "and all records are sent together in a single HTTP
request.")
+ .defaultValue("0")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("Web Service Client Provider")
+ .description("Controller service to provide the HTTP client for
sending the HTTP requests.")
+ .required(true)
+ .identifiesControllerService(WebClientServiceProvider.class)
+ .build();
+
+ public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("OAuth2 Access Token Provider")
+ .description("OAuth2 service that provides the access tokens for
the HTTP requests.")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(false)
+ .build();
+
+ private String apiUrl;
+ private int maxBatchSize;
+ private volatile RecordSetWriterFactory writerFactory;
+ private WebClientServiceProvider webClientServiceProvider;
+ private volatile Optional<OAuth2AccessTokenProvider>
oauth2AccessTokenProviderOptional;
+ Map<String, String> dynamicHttpHeaders;
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ API_URL,
+ MAX_BATCH_SIZE,
+ RECORD_WRITER_FACTORY,
+ WEB_SERVICE_CLIENT_PROVIDER,
+ OAUTH2_ACCESS_TOKEN_PROVIDER
+ ));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ /**
+ * Returns a PropertyDescriptor for the given name. This is for the user
to be able to define their own properties
+ * which will sent as HTTP headers on the HTTP request
+ *
+ * @param propertyDescriptorName used to lookup if any property
descriptors exist for that name
+ * @return a PropertyDescriptor object corresponding to the specified
dynamic property name
+ */
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ if (hasProhibitedName(propertyDescriptorName, HEADER_CONTENT_TYPE)) {
+ // Content-Type is case-sensitive for overriding our default
Content-Type header, so prevent any other combination of upper/lower case
letters
+ return getInvalidDynamicPropertyDescriptor(propertyDescriptorName,
"is not allowed. Only exact case of Content-Type is allowed.");
+ }
+
+ if (hasProhibitedName(propertyDescriptorName, HEADER_AUTHORIZATION)) {
+ // Authorization is case-sensitive for overriding our default
Authorization header, so prevent any other combination of upper/lower case
letters
+ return getInvalidDynamicPropertyDescriptor(propertyDescriptorName,
"is not allowed. Only exact case of Authorization is allowed.");
+ }
+
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamic(true)
+ .build();
+ }
+
+ private static boolean hasProhibitedName(String userInput, String
correctName) {
+ // Do not allow : in any case be
+ // cause it is not the correct name
+ // 'correctName' header is case-sensitive for overriding our default
'correctName' header, so prevent any other combination of upper/lower case
letters
+ return (correctName + ":").equalsIgnoreCase(userInput)
+ || (correctName.equalsIgnoreCase(userInput) &&
!correctName.equals(userInput));
+ }
+
+ private static PropertyDescriptor
getInvalidDynamicPropertyDescriptor(String propertyDescriptorName, String
explanation) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator((subject, input, context) -> new
ValidationResult.Builder()
+ .explanation(explanation)
+ .valid(false)
+ .subject(subject)
+ .build())
+ .dynamic(true)
+ .build();
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ apiUrl =
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+ maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+ writerFactory =
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+ webClientServiceProvider = context
+
.getProperty(WEB_SERVICE_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
+
+ if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) {
+ OAuth2AccessTokenProvider oauth2AccessTokenProvider = context
+
.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ oauth2AccessTokenProvider.getAccessDetails();
+ oauth2AccessTokenProviderOptional =
Optional.of(oauth2AccessTokenProvider);
+ } else {
+ oauth2AccessTokenProviderOptional = Optional.empty();
+ }
+
+ // Dynamic properties are sent as http headers on the post request.
+ dynamicHttpHeaders = context.getProperties().keySet().stream()
+ .filter(PropertyDescriptor::isDynamic)
+ .collect(Collectors.toMap(
+ PropertyDescriptor::getName,
+ p ->
context.getProperty(p).evaluateAttributeExpressions().getValue()));
+ }
+
+ @Override
+ public WriteResult sendData(RecordSet recordSet, Map<String, String>
attributes, boolean sendZeroResults) throws IOException {
+ WriteResult writeResult;
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), recordSet.getSchema(), baos,
attributes)) {
+ writeResult = sendRecords(recordSet, writer, baos, maxBatchSize);
+ } catch (SchemaNotFoundException e) {
+ final String errorMessage = String.format("RecordSetWriter could
not be created because the schema was not found. The schema name for the
RecordSet to write is %s",
+ recordSet.getSchema().getSchemaName());
+ throw new IOException(errorMessage, e);
+ }
+
+ return writeResult;
+ }
+
+ private WriteResult sendRecords(final RecordSet recordSet, final
RecordSetWriter writer, final ByteArrayOutputStream baos, int maxBatchSize)
throws IOException {
+ WriteResult writeResult = WriteResult.EMPTY;
+ Record r = recordSet.next();
+ if (r != null) {
+ int batchCount = 0;
+ do {
+ if (maxBatchSize != 1 && batchCount == 0) {
+ // If maxBatchSize is one, then do NOT write record set
begin or end markers because
+ // each single record is sent alone without being in an
array.
+ writer.beginRecordSet();
+ }
+
+ writeResult = writer.write(r);
+ batchCount++;
+
+ r = recordSet.next();
+
+ // If this is last record, then send current group of records.
+ // OR if we have processed maxBatchSize records, then send
current group of records.
+ // Unless batchCount is 0, which means to send all records
together in one batch at the end.
+ if (r == null || (maxBatchSize > 0 && batchCount >=
maxBatchSize)) {
+ if (maxBatchSize != 1) {
+ writeResult = writer.finishRecordSet();
+ }
+ writer.flush();
+ sendHttpRequest(baos.toByteArray(), writer.getMimeType());
+ baos.reset();
+ batchCount = 0;
+ }
+ } while (r != null);
+ }
+ return writeResult;
+ }
+
+ public void sendHttpRequest(final byte[] body, String mimeType) throws
IOException {
+ final URI apiUri = URI.create(apiUrl);
+ final HttpUriBuilder uriBuilder =
webClientServiceProvider.getHttpUriBuilder()
+ .scheme(apiUri.getScheme())
+ .host(apiUri.getHost())
+ .encodedPath(apiUri.getPath());
+ if (apiUri.getPort() != -1) {
+ uriBuilder.port(apiUri.getPort());
+ }
+ final URI uri = uriBuilder.build();
+
+ HttpRequestBodySpec requestBodySpec =
webClientServiceProvider.getWebClientService()
+ .post()
+ .uri(uri);
+
+ dynamicHttpHeaders.forEach(requestBodySpec::header);
+
+ if (StringUtils.isNotBlank(mimeType) &&
!dynamicHttpHeaders.containsKey(HEADER_CONTENT_TYPE)) {
+ requestBodySpec.header(HEADER_CONTENT_TYPE, mimeType);
+ }
+
+ if (!dynamicHttpHeaders.containsKey(HEADER_AUTHORIZATION)) {
+
oauth2AccessTokenProviderOptional.ifPresent(oauth2AccessTokenProvider ->
+ requestBodySpec.header(HEADER_AUTHORIZATION, "Bearer " +
oauth2AccessTokenProvider.getAccessDetails().getAccessToken()));
+ }
+
+ final InputStream requestBodyInputStream = new
ByteArrayInputStream(body);
+
+ try (final HttpResponseEntity response = requestBodySpec
+ .body(requestBodyInputStream,
OptionalLong.of(requestBodyInputStream.available()))
+ .retrieve()) {
+ final int statusCode = response.statusCode();
+ if (!(statusCode >= 200 && statusCode < 300)) {
+ throw new IOException(String.format("HTTP request failed with
status code: %s for url: %s and returned response body: %s",
+ statusCode, uri.toString(), response.body() == null ?
"none" : IOUtils.toString(response.body(), StandardCharsets.UTF_8)));
+ }
+ } catch (final IOException ioe) {
+ throw ioe;
+ } catch (final Exception e) {
+ throw new IOException(String.format("HttpRecordSink HTTP request
transmission failed for url: %s", uri.toString()), e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 4ca4073bf2..011851f6bf 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -16,3 +16,4 @@ org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup
org.apache.nifi.record.sink.LoggingRecordSink
org.apache.nifi.record.sink.EmailRecordSink
org.apache.nifi.record.sink.event.UDPEventRecordSink
+org.apache.nifi.record.sink.HttpRecordSink
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestHttpRecordSink.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestHttpRecordSink.java
new file mode 100644
index 0000000000..0c485ae707
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestHttpRecordSink.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Answers;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHttpRecordSink {
+ public static final String ID = "id";
+ public static final String NAME = "name";
+ public static final String ACTIVE = "active";
+
+ private TestRunner testRunner;
+ private MockWebServer mockWebServer;
+ private HttpRecordSink httpRecordSink;
+ private RecordSetWriterFactory writerFactory;
+ final private String OAUTH_ACCESS_TOKEN = "access_token";
+
+ private static RecordSchema schema;
+ private static Record[] records;
+ private ObjectMapper mapper;
+
+ @BeforeAll
+ public static void setupOnce() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField(ID, RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField(NAME,
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField(ACTIVE,
RecordFieldType.BOOLEAN.getDataType()));
+
+ schema = new SimpleRecordSchema(fields);
+
+ final Record record0 = createRecord(schema, 0);
+ final Record record1 = createRecord(schema, 1);
+ final Record record2 = createRecord(schema, 2);
+ final Record record3 = createRecord(schema, 3);
+ final Record record4 = createRecord(schema, 4);
+ records = new Record[] {record0, record1, record2, record3, record4 };
+ }
+
+ private static Record createRecord(final RecordSchema schema, final int
index) {
+ final Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put(ID, index);
+ valueMap.put(NAME, "Name_äöü_こんにちは世界_" + index);
+ valueMap.put(ACTIVE, index % 2 == 0);
+ return new MapRecord(schema, valueMap);
+ }
+
+ private static RecordSet createRecordSetWithSize(final int size) {
+ return RecordSet.of(schema, Arrays.copyOf(records, size));
+ }
+
+ @BeforeEach
+ public void setupEachTest() throws InitializationException, IOException {
+ mapper = new ObjectMapper();
+
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+ String url = mockWebServer.url("/api/test").toString();
+
+ testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ final WebClientServiceProvider webClientServiceProvider = new
StandardWebClientServiceProvider();
+ testRunner.addControllerService("webClientServiceProvider",
webClientServiceProvider);
+ testRunner.enableControllerService(webClientServiceProvider);
+
+ httpRecordSink = new HttpRecordSink();
+
+ testRunner.addControllerService("httpRecordSink", httpRecordSink);
+ testRunner.setProperty(httpRecordSink, HttpRecordSink.API_URL, url);
+ testRunner.setProperty(httpRecordSink,
HttpRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "webClientServiceProvider");
+
+ writerFactory = new JsonRecordSetWriter();
+ testRunner.addControllerService("writer", writerFactory);
+ testRunner.setProperty(httpRecordSink,
HttpRecordSink.RECORD_WRITER_FACTORY, "writer");
+
+ setupOAuth2TokenProvider();
+ }
+
+ private void setupOAuth2TokenProvider() throws InitializationException {
+ String oauth2AccessTokenProviderId = "oauth2AccessTokenProviderId";
+
+ OAuth2AccessTokenProvider oauth2AccessTokenProvider =
mock(OAuth2AccessTokenProvider.class, Answers.RETURNS_DEEP_STUBS);
+
when(oauth2AccessTokenProvider.getIdentifier()).thenReturn(oauth2AccessTokenProviderId);
+
when(oauth2AccessTokenProvider.getAccessDetails().getAccessToken()).thenReturn(OAUTH_ACCESS_TOKEN);
+
+ testRunner.addControllerService(oauth2AccessTokenProviderId,
oauth2AccessTokenProvider);
+ testRunner.enableControllerService(oauth2AccessTokenProvider);
+
+ testRunner.setProperty(httpRecordSink,
HttpRecordSink.OAUTH2_ACCESS_TOKEN_PROVIDER, oauth2AccessTokenProviderId);
+ }
+
+ @AfterEach
+ public void cleanUpEachTest() throws IOException {
+ mockWebServer.shutdown();
+ }
+
+ @Test
+ public void testInvalidIfApiUrlEmpty() {
+ testRunner.setProperty(httpRecordSink, HttpRecordSink.API_URL, "");
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertNotValid(httpRecordSink);
+ }
+
+ @Test
+ public void testInvalidIfWebClientServiceDoesNotExist() {
+ testRunner.setProperty(httpRecordSink,
HttpRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "nonexistent");
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertNotValid(httpRecordSink);
+ }
+
+ @Test
+ public void testValidContentTypeHeader() throws Exception {
+ testRunner.setProperty(httpRecordSink, "Content-Type",
"my_content_type");
+ testRunner.setProperty(httpRecordSink, "RandomHeader", "random_value");
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(httpRecordSink);
+
+ testRunner.disableControllerService(writerFactory);
+ testSendData(5, 2, "my_content_type", null);
+ }
+
+ @Test
+ public void testInvalidContentTypeHeader() {
+ testRunner.setProperty(httpRecordSink, "content-type", "anything");
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertNotValid(httpRecordSink);
+ }
+
+ @Test
+ public void testValidAuthorizationDynamicHeader() throws Exception {
+ testRunner.setProperty(httpRecordSink, "Authorization", "Bearer
my_authorization");
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(httpRecordSink);
+
+ testRunner.disableControllerService(writerFactory);
+ testSendData(3, 1, null, "my_authorization");
+ }
+
+ @Test
+ public void testInvalidAuthorizationDynamicHeader() {
+ testRunner.setProperty(httpRecordSink, "authorization", "anything");
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertNotValid(httpRecordSink);
+ }
+
+ @Test
+ public void testSendDataBatchSize0() throws Exception {
+ testSendData(5, 0);
+ }
+
+ @Test
+ public void testSendDataBatchSize1() throws Exception {
+ testSendData(4, 1);
+ }
+
+ @Test
+ public void testSendDataBatchSize2() throws Exception {
+ testSendData(2, 2);
+ }
+
+ @Test
+ public void testSendDataBatchSize3() throws Exception {
+ testSendData(2, 3);
+ }
+
+ @Test
+ public void testSendDataBatchSize4() throws Exception {
+ testSendData(5, 4);
+ }
+
+ @Test
+ public void testSendDataBatchSize5() throws Exception {
+ testSendData(2, 5);
+ }
+
+ public void testSendData(int recordCount, int maxBatchSize) throws
Exception {
+ testSendData(recordCount, maxBatchSize, null, null);
+ }
+
+ public void testSendData(int recordCount, int maxBatchSize,
+ String expectedContentType, String
expectedAuthorization) throws Exception {
+ RecordSet recordSetIn = createRecordSetWithSize(recordCount);
+ int expectedRequestCount = maxBatchSize == 0
+ ? 1
+ : recordCount / maxBatchSize + ((recordCount % maxBatchSize ==
0) ? 0 : 1);
+ testRunner.setProperty(httpRecordSink, HttpRecordSink.MAX_BATCH_SIZE,
String.valueOf(maxBatchSize));
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(httpRecordSink);
+ testRunner.enableControllerService(httpRecordSink);
+
+ for (int i = 0; i < expectedRequestCount; i++) {
+ mockWebServer.enqueue(new MockResponse());
+ }
+
+ final WriteResult writeResult = httpRecordSink.sendData(recordSetIn,
Collections.emptyMap(), false);
+
+ assertNotNull(writeResult);
+ assertEquals(recordCount, writeResult.getRecordCount());
+ assertEquals(Collections.EMPTY_MAP, writeResult.getAttributes());
+
+ assertEquals(expectedRequestCount, mockWebServer.getRequestCount());
+
+ for (int i = 0; i < expectedRequestCount; i++) {
+ RecordedRequest recordedRequest = mockWebServer.takeRequest();
+ String requestBody =
recordedRequest.getBody().readString(StandardCharsets.UTF_8);
+ Person[] people =
+ (maxBatchSize == 1)
+ ? new Person[] {
+ // For maxBatchSize 1, person is not in a Json
array
+ mapper.readValue(requestBody, Person.class)
+ }
+ : mapper.readValue(requestBody, Person[].class);
// Otherwise the body is a json array
+
+ for (int personIndex = 0; personIndex < people.length;
personIndex++) {
+ final int compareIndex = i * maxBatchSize + personIndex;
+ assertTrue(people[personIndex].equals(records[compareIndex]),
"Mismatch - Expected: " + records[compareIndex].toMap().toString() +
+ " Actual: {" + people[personIndex].toString() + "}
order of fields can be ignored.");
+ }
+ String actualContentTypeHeader =
recordedRequest.getHeader(HttpRecordSink.HEADER_CONTENT_TYPE);
+ assertEquals(expectedContentType != null ? expectedContentType :
"application/json", actualContentTypeHeader);
+
+ String actualAuthorizationHeader =
recordedRequest.getHeader(HttpRecordSink.HEADER_AUTHORIZATION);
+ assertEquals("Bearer " + (expectedAuthorization != null ?
expectedAuthorization : OAUTH_ACCESS_TOKEN),
+ actualAuthorizationHeader);
+ }
+ }
+
+ static public class Person {
+ public int id;
+ public String name;
+ public boolean active;
+
+ public boolean equals(Record record) {
+ return id == record.getAsInt(ID)
+ && name.equals(record.getAsString(NAME))
+ && active == record.getAsBoolean(ACTIVE);
+ }
+
+ public String toString() {
+ return ID + "=" + id + ", " + NAME + "=" + name + ", " + ACTIVE +
"=" + active;
+ }
+ }
+}
\ No newline at end of file