This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new f106f8474a NIFI-13543 Backport HttpRecordSink service to NiFi 1.x
f106f8474a is described below

commit f106f8474a16edfd5cb97b41e4abceec1e144ffb
Author: Jim Steinebrey <[email protected]>
AuthorDate: Fri Sep 6 18:32:49 2024 -0400

    NIFI-13543 Backport HttpRecordSink service to NiFi 1.x
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9243.
---
 .../nifi-record-sink-service/pom.xml               |  36 +++
 .../apache/nifi/record/sink/HttpRecordSink.java    | 281 +++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../nifi/record/sink/TestHttpRecordSink.java       | 303 +++++++++++++++++++++
 4 files changed, 621 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
index e08fbfc553..c447df1beb 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
@@ -59,6 +59,42 @@
             <artifactId>nifi-event-transport</artifactId>
             <version>1.28.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-client-provider-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-client-provider-service</artifactId>
+            <version>1.28.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-proxy-configuration-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.28.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-security-utils-api</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/HttpRecordSink.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/HttpRecordSink.java
new file mode 100644
index 0000000000..150580ae41
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/HttpRecordSink.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.sink;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+@Tags({"http", "post", "record", "sink"})
+@CapabilityDescription("Format and send Records to a configured uri using HTTP 
post. The Record Writer formats the records which are sent as the body of the 
HTTP post request. " +
+        "JsonRecordSetWriter is often used with this processor because many 
HTTP posts require a JSON body.")
+public class HttpRecordSink extends AbstractControllerService implements 
RecordSinkService {
+    protected static final String HEADER_AUTHORIZATION = "Authorization";
+    protected static final String HEADER_CONTENT_TYPE = "Content-Type";
+
+    public static final PropertyDescriptor API_URL = new 
PropertyDescriptor.Builder()
+            .name("API URL")
+            .description("The URL which receives the HTTP requests.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Batch Size")
+            .description("Specifies the maximum number of records to send in 
the body of each HTTP request. Zero means the batch size is not limited, "
+                    + "and all records are sent together in a single HTTP 
request.")
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("Web Service Client Provider")
+            .description("Controller service to provide the HTTP client for 
sending the HTTP requests.")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+
+    public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("OAuth2 Access Token Provider")
+            .description("OAuth2 service that provides the access tokens for 
the HTTP requests.")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(false)
+            .build();
+
+    private String apiUrl;
+    private int maxBatchSize;
+    private volatile RecordSetWriterFactory writerFactory;
+    private WebClientServiceProvider webClientServiceProvider;
+    private volatile Optional<OAuth2AccessTokenProvider> 
oauth2AccessTokenProviderOptional;
+    Map<String, String> dynamicHttpHeaders;
+
+    public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            MAX_BATCH_SIZE,
+            RECORD_WRITER_FACTORY,
+            WEB_SERVICE_CLIENT_PROVIDER,
+            OAUTH2_ACCESS_TOKEN_PROVIDER
+    ));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    /**
+     * Returns a PropertyDescriptor for the given name. This is for the user 
to be able to define their own properties
+     * which will sent as HTTP headers on the HTTP request
+     *
+     * @param propertyDescriptorName used to lookup if any property 
descriptors exist for that name
+     * @return a PropertyDescriptor object corresponding to the specified 
dynamic property name
+     */
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        if (hasProhibitedName(propertyDescriptorName, HEADER_CONTENT_TYPE)) {
+            // Content-Type is case-sensitive for overriding our default 
Content-Type header, so prevent any other combination of upper/lower case 
letters
+            return getInvalidDynamicPropertyDescriptor(propertyDescriptorName, 
"is not allowed. Only exact case of Content-Type is allowed.");
+        }
+
+        if (hasProhibitedName(propertyDescriptorName, HEADER_AUTHORIZATION)) {
+            // Authorization is case-sensitive for overriding our default 
Authorization header, so prevent any other combination of upper/lower case 
letters
+            return getInvalidDynamicPropertyDescriptor(propertyDescriptorName, 
"is not allowed. Only exact case of Authorization is allowed.");
+        }
+
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .dynamic(true)
+                .build();
+    }
+
+    private static boolean hasProhibitedName(String userInput, String 
correctName) {
+        // Do not allow : in any case be
+        // cause it is not the correct name
+        // 'correctName' header is case-sensitive for overriding our default 
'correctName' header, so prevent any other combination of upper/lower case 
letters
+        return (correctName + ":").equalsIgnoreCase(userInput)
+                || (correctName.equalsIgnoreCase(userInput) && 
!correctName.equals(userInput));
+    }
+
+    private static PropertyDescriptor 
getInvalidDynamicPropertyDescriptor(String propertyDescriptorName, String 
explanation) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator((subject, input, context) -> new 
ValidationResult.Builder()
+                        .explanation(explanation)
+                        .valid(false)
+                        .subject(subject)
+                        .build())
+                .dynamic(true)
+                .build();
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        writerFactory = 
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        webClientServiceProvider = context
+                
.getProperty(WEB_SERVICE_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
+
+        if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) {
+            OAuth2AccessTokenProvider oauth2AccessTokenProvider = context
+                    
.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+            oauth2AccessTokenProvider.getAccessDetails();
+            oauth2AccessTokenProviderOptional = 
Optional.of(oauth2AccessTokenProvider);
+        } else {
+            oauth2AccessTokenProviderOptional = Optional.empty();
+        }
+
+        // Dynamic properties are sent as http headers on the post request.
+        dynamicHttpHeaders = context.getProperties().keySet().stream()
+                .filter(PropertyDescriptor::isDynamic)
+                .collect(Collectors.toMap(
+                        PropertyDescriptor::getName,
+                        p -> 
context.getProperty(p).evaluateAttributeExpressions().getValue()));
+    }
+
+    @Override
+    public WriteResult sendData(RecordSet recordSet, Map<String, String> 
attributes, boolean sendZeroResults) throws IOException {
+        WriteResult writeResult;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), recordSet.getSchema(), baos, 
attributes)) {
+            writeResult = sendRecords(recordSet, writer, baos, maxBatchSize);
+        } catch (SchemaNotFoundException e) {
+            final String errorMessage = String.format("RecordSetWriter could 
not be created because the schema was not found. The schema name for the 
RecordSet to write is %s",
+                    recordSet.getSchema().getSchemaName());
+            throw new IOException(errorMessage, e);
+        }
+
+        return writeResult;
+    }
+
+    private WriteResult sendRecords(final RecordSet recordSet, final 
RecordSetWriter writer, final ByteArrayOutputStream baos, int maxBatchSize) 
throws IOException {
+        WriteResult writeResult = WriteResult.EMPTY;
+        Record r = recordSet.next();
+        if (r != null) {
+            int batchCount = 0;
+            do {
+                if (maxBatchSize != 1 && batchCount == 0) {
+                    // If maxBatchSize is one, then do NOT write record set 
begin or end markers because
+                    // each single record is sent alone without being in an 
array.
+                    writer.beginRecordSet();
+                }
+
+                writeResult = writer.write(r);
+                batchCount++;
+
+                r = recordSet.next();
+
+                // If this is last record, then send current group of records.
+                // OR if we have processed maxBatchSize records, then send 
current group of records.
+                // Unless batchCount is 0, which means to send all records 
together in one batch at the end.
+                if (r == null || (maxBatchSize > 0 && batchCount >= 
maxBatchSize)) {
+                    if (maxBatchSize != 1) {
+                        writeResult = writer.finishRecordSet();
+                    }
+                    writer.flush();
+                    sendHttpRequest(baos.toByteArray(), writer.getMimeType());
+                    baos.reset();
+                    batchCount = 0;
+                }
+            } while (r != null);
+        }
+        return writeResult;
+    }
+
+    public void sendHttpRequest(final byte[] body, String mimeType) throws 
IOException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = 
webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath());
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        HttpRequestBodySpec requestBodySpec = 
webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri);
+
+        dynamicHttpHeaders.forEach(requestBodySpec::header);
+
+        if (StringUtils.isNotBlank(mimeType) && 
!dynamicHttpHeaders.containsKey(HEADER_CONTENT_TYPE)) {
+            requestBodySpec.header(HEADER_CONTENT_TYPE, mimeType);
+        }
+
+        if (!dynamicHttpHeaders.containsKey(HEADER_AUTHORIZATION)) {
+            
oauth2AccessTokenProviderOptional.ifPresent(oauth2AccessTokenProvider ->
+                    requestBodySpec.header(HEADER_AUTHORIZATION, "Bearer " + 
oauth2AccessTokenProvider.getAccessDetails().getAccessToken()));
+        }
+
+        final InputStream requestBodyInputStream = new 
ByteArrayInputStream(body);
+
+        try (final HttpResponseEntity response = requestBodySpec
+                .body(requestBodyInputStream, 
OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new IOException(String.format("HTTP request failed with 
status code: %s for url: %s and returned response body: %s",
+                        statusCode, uri.toString(), response.body() == null ? 
"none" : IOUtils.toString(response.body(), StandardCharsets.UTF_8)));
+            }
+        } catch (final IOException ioe) {
+            throw ioe;
+        } catch (final Exception e) {
+            throw new IOException(String.format("HttpRecordSink HTTP request 
transmission failed for url: %s", uri.toString()), e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 4ca4073bf2..011851f6bf 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -16,3 +16,4 @@ org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup
 org.apache.nifi.record.sink.LoggingRecordSink
 org.apache.nifi.record.sink.EmailRecordSink
 org.apache.nifi.record.sink.event.UDPEventRecordSink
+org.apache.nifi.record.sink.HttpRecordSink
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestHttpRecordSink.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestHttpRecordSink.java
new file mode 100644
index 0000000000..0c485ae707
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestHttpRecordSink.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import 
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Answers;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHttpRecordSink {
+    public static final String ID = "id";
+    public static final String NAME = "name";
+    public static final String ACTIVE = "active";
+
+    private TestRunner testRunner;
+    private MockWebServer mockWebServer;
+    private HttpRecordSink httpRecordSink;
+    private RecordSetWriterFactory writerFactory;
+    final private String OAUTH_ACCESS_TOKEN = "access_token";
+
+    private static RecordSchema schema;
+    private static Record[] records;
+    private ObjectMapper mapper;
+
+    @BeforeAll
+    public static void setupOnce() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField(ID, RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField(NAME, 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField(ACTIVE, 
RecordFieldType.BOOLEAN.getDataType()));
+
+        schema = new SimpleRecordSchema(fields);
+
+        final Record record0 = createRecord(schema, 0);
+        final Record record1 = createRecord(schema, 1);
+        final Record record2 = createRecord(schema, 2);
+        final Record record3 = createRecord(schema, 3);
+        final Record record4 = createRecord(schema, 4);
+        records = new Record[] {record0, record1, record2, record3, record4 };
+    }
+
+    private static Record createRecord(final RecordSchema schema, final int 
index) {
+        final Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ID, index);
+        valueMap.put(NAME, "Name_äöü_こんにちは世界_" + index);
+        valueMap.put(ACTIVE, index % 2 == 0);
+        return new MapRecord(schema, valueMap);
+    }
+
+    private static RecordSet createRecordSetWithSize(final int size) {
+        return RecordSet.of(schema, Arrays.copyOf(records, size));
+    }
+
+    @BeforeEach
+    public void setupEachTest() throws InitializationException, IOException {
+        mapper = new ObjectMapper();
+
+        mockWebServer = new MockWebServer();
+        mockWebServer.start();
+        String url = mockWebServer.url("/api/test").toString();
+
+        testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+        final WebClientServiceProvider webClientServiceProvider = new 
StandardWebClientServiceProvider();
+        testRunner.addControllerService("webClientServiceProvider", 
webClientServiceProvider);
+        testRunner.enableControllerService(webClientServiceProvider);
+
+        httpRecordSink = new HttpRecordSink();
+
+        testRunner.addControllerService("httpRecordSink", httpRecordSink);
+        testRunner.setProperty(httpRecordSink, HttpRecordSink.API_URL, url);
+        testRunner.setProperty(httpRecordSink, 
HttpRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "webClientServiceProvider");
+
+        writerFactory = new JsonRecordSetWriter();
+        testRunner.addControllerService("writer", writerFactory);
+        testRunner.setProperty(httpRecordSink, 
HttpRecordSink.RECORD_WRITER_FACTORY, "writer");
+
+        setupOAuth2TokenProvider();
+    }
+
+    private void setupOAuth2TokenProvider() throws InitializationException {
+        String oauth2AccessTokenProviderId = "oauth2AccessTokenProviderId";
+
+        OAuth2AccessTokenProvider oauth2AccessTokenProvider = 
mock(OAuth2AccessTokenProvider.class, Answers.RETURNS_DEEP_STUBS);
+        
when(oauth2AccessTokenProvider.getIdentifier()).thenReturn(oauth2AccessTokenProviderId);
+        
when(oauth2AccessTokenProvider.getAccessDetails().getAccessToken()).thenReturn(OAUTH_ACCESS_TOKEN);
+
+        testRunner.addControllerService(oauth2AccessTokenProviderId, 
oauth2AccessTokenProvider);
+        testRunner.enableControllerService(oauth2AccessTokenProvider);
+
+        testRunner.setProperty(httpRecordSink, 
HttpRecordSink.OAUTH2_ACCESS_TOKEN_PROVIDER, oauth2AccessTokenProviderId);
+    }
+
+    @AfterEach
+    public void cleanUpEachTest() throws IOException {
+        mockWebServer.shutdown();
+    }
+
+    @Test
+    public void testInvalidIfApiUrlEmpty() {
+        testRunner.setProperty(httpRecordSink, HttpRecordSink.API_URL, "");
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertNotValid(httpRecordSink);
+    }
+
+    @Test
+    public void testInvalidIfWebClientServiceDoesNotExist() {
+        testRunner.setProperty(httpRecordSink, 
HttpRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "nonexistent");
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertNotValid(httpRecordSink);
+    }
+
+    @Test
+    public void testValidContentTypeHeader() throws Exception {
+        testRunner.setProperty(httpRecordSink, "Content-Type", 
"my_content_type");
+        testRunner.setProperty(httpRecordSink, "RandomHeader", "random_value");
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(httpRecordSink);
+
+        testRunner.disableControllerService(writerFactory);
+        testSendData(5, 2, "my_content_type", null);
+    }
+
+    @Test
+    public void testInvalidContentTypeHeader() {
+        testRunner.setProperty(httpRecordSink, "content-type", "anything");
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertNotValid(httpRecordSink);
+    }
+
+    @Test
+    public void testValidAuthorizationDynamicHeader() throws Exception {
+        testRunner.setProperty(httpRecordSink, "Authorization", "Bearer 
my_authorization");
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(httpRecordSink);
+
+        testRunner.disableControllerService(writerFactory);
+        testSendData(3, 1, null, "my_authorization");
+    }
+
+    @Test
+    public void testInvalidAuthorizationDynamicHeader() {
+        testRunner.setProperty(httpRecordSink, "authorization", "anything");
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertNotValid(httpRecordSink);
+    }
+
+    @Test
+    public void testSendDataBatchSize0() throws Exception {
+        testSendData(5, 0);
+    }
+
+    @Test
+    public void testSendDataBatchSize1() throws Exception {
+        testSendData(4, 1);
+    }
+
+    @Test
+    public void testSendDataBatchSize2() throws Exception {
+        testSendData(2, 2);
+    }
+
+    @Test
+    public void testSendDataBatchSize3() throws Exception {
+        testSendData(2, 3);
+    }
+
+    @Test
+    public void testSendDataBatchSize4() throws Exception {
+        testSendData(5, 4);
+    }
+
+    @Test
+    public void testSendDataBatchSize5() throws Exception {
+        testSendData(2, 5);
+    }
+
+    public void testSendData(int recordCount, int maxBatchSize) throws 
Exception {
+        testSendData(recordCount, maxBatchSize, null, null);
+    }
+
+    public void testSendData(int recordCount, int maxBatchSize,
+                             String expectedContentType, String 
expectedAuthorization) throws Exception {
+        RecordSet recordSetIn = createRecordSetWithSize(recordCount);
+        int expectedRequestCount = maxBatchSize == 0
+                ? 1
+                : recordCount / maxBatchSize + ((recordCount % maxBatchSize == 
0) ? 0 : 1);
+        testRunner.setProperty(httpRecordSink, HttpRecordSink.MAX_BATCH_SIZE, 
String.valueOf(maxBatchSize));
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(httpRecordSink);
+        testRunner.enableControllerService(httpRecordSink);
+
+        for (int i = 0; i < expectedRequestCount; i++) {
+            mockWebServer.enqueue(new MockResponse());
+        }
+
+        final WriteResult writeResult = httpRecordSink.sendData(recordSetIn, 
Collections.emptyMap(), false);
+
+        assertNotNull(writeResult);
+        assertEquals(recordCount, writeResult.getRecordCount());
+        assertEquals(Collections.EMPTY_MAP, writeResult.getAttributes());
+
+        assertEquals(expectedRequestCount, mockWebServer.getRequestCount());
+
+        for (int i = 0; i < expectedRequestCount; i++) {
+            RecordedRequest recordedRequest = mockWebServer.takeRequest();
+            String requestBody = 
recordedRequest.getBody().readString(StandardCharsets.UTF_8);
+            Person[] people =
+                    (maxBatchSize == 1)
+                            ? new Person[] {
+                            // For maxBatchSize 1, person is not in a Json 
array
+                            mapper.readValue(requestBody, Person.class)
+                    }
+                            : mapper.readValue(requestBody, Person[].class); 
// Otherwise the body is a json array
+
+            for (int personIndex = 0; personIndex < people.length; 
personIndex++) {
+                final int compareIndex = i * maxBatchSize + personIndex;
+                assertTrue(people[personIndex].equals(records[compareIndex]), 
"Mismatch - Expected: " + records[compareIndex].toMap().toString() +
+                        " Actual: {" + people[personIndex].toString() + "} 
order of fields can be ignored.");
+            }
+            String actualContentTypeHeader = 
recordedRequest.getHeader(HttpRecordSink.HEADER_CONTENT_TYPE);
+            assertEquals(expectedContentType != null ? expectedContentType : 
"application/json", actualContentTypeHeader);
+
+            String actualAuthorizationHeader = 
recordedRequest.getHeader(HttpRecordSink.HEADER_AUTHORIZATION);
+            assertEquals("Bearer " + (expectedAuthorization != null ? 
expectedAuthorization : OAUTH_ACCESS_TOKEN),
+                    actualAuthorizationHeader);
+        }
+    }
+
+    static public class Person {
+        public int id;
+        public String name;
+        public boolean active;
+
+        public boolean equals(Record record) {
+            return id == record.getAsInt(ID)
+                    && name.equals(record.getAsString(NAME))
+                    && active == record.getAsBoolean(ACTIVE);
+        }
+
+        public String toString() {
+            return ID + "=" + id + ", " + NAME + "=" + name + ", " + ACTIVE + 
"=" + active;
+        }
+    }
+}
\ No newline at end of file

Reply via email to