Lehel44 commented on code in PR #7644: URL: https://github.com/apache/nifi/pull/7644#discussion_r1323690358
########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-services/src/main/java/org/apache/nifi/services/zendesk/ZendeskRecordSink.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.zendesk; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.common.zendesk.ZendeskAuthenticationContext; +import org.apache.nifi.common.zendesk.ZendeskAuthenticationType; +import org.apache.nifi.common.zendesk.ZendeskClient; +import org.apache.nifi.common.zendesk.validation.JsonPointerPropertyNameValidator; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static org.apache.nifi.common.zendesk.ZendeskProperties.WEB_CLIENT_SERVICE_PROVIDER; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_CREDENTIAL; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_TYPE; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKETS_RESOURCE; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKET_RESOURCE; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_SUBDOMAIN; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_TICKET_COMMENT_BODY_NAME; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_TICKET_PRIORITY_NAME; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_TICKET_SUBJECT_NAME; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_TICKET_TYPE_NAME; +import static org.apache.nifi.common.zendesk.ZendeskProperties.ZENDESK_USER; +import static org.apache.nifi.common.zendesk.util.ZendeskRecordPathUtils.addDynamicField; +import static org.apache.nifi.common.zendesk.util.ZendeskRecordPathUtils.resolveFieldValue; +import static org.apache.nifi.common.zendesk.util.ZendeskUtils.createRequestObject; +import static org.apache.nifi.common.zendesk.util.ZendeskUtils.getDynamicProperties; +import static org.apache.nifi.common.zendesk.util.ZendeskUtils.getResponseBody; + +@Tags({"zendesk", "record", "sink"}) +@CapabilityDescription("Create Zendesk tickets using the Zendesk API." + + "The service requires a Zendesk account with configured access.") +public class ZendeskRecordSink extends AbstractControllerService implements RecordSinkService { + + private final ObjectMapper mapper = new ObjectMapper(); + private Map<String, String> dynamicProperties; + private volatile RecordSetWriterFactory writerFactory; + private Cache<String, ObjectNode> recordCache; + private ZendeskClient zendeskClient; + + private String commentBody; + private String subject; + private String priority; + private String type; + + public static final PropertyDescriptor ZENDESK_TICKET_COMMENT_BODY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_COMMENT_BODY_NAME) + .displayName("Comment Body") + .description("The content or the path to the comment body in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_SUBJECT = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_SUBJECT_NAME) + .displayName("Subject") + .description("The content or the path to the subject in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_PRIORITY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_PRIORITY_NAME) + .displayName("Priority") + .description("The content or the path to the priority in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_TYPE = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_TYPE_NAME) + .displayName("Type") + .description("The content or the path to the type in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Zendesk ticket should be cached.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder() + .name("cache-expiration") + .displayName("Cache Expiration") + .description("Specifies how long a Zendesk ticket that is cached should remain in the cache.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .required(true) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + RecordSinkService.RECORD_WRITER_FACTORY, + WEB_CLIENT_SERVICE_PROVIDER, + ZENDESK_SUBDOMAIN, + ZENDESK_USER, + ZENDESK_AUTHENTICATION_TYPE, + ZENDESK_AUTHENTICATION_CREDENTIAL, + ZENDESK_TICKET_COMMENT_BODY, + ZENDESK_TICKET_SUBJECT, + ZENDESK_TICKET_PRIORITY, + ZENDESK_TICKET_TYPE, + CACHE_SIZE, + CACHE_EXPIRATION + )); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(new JsonPointerPropertyNameValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException { + final WriteResult writeResult; + List<ObjectNode> zendeskTickets = new ArrayList<>(); + + try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) { + + writer.beginRecordSet(); + Record record; + while ((record = recordSet.next()) != null) { + ObjectNode baseTicketNode = mapper.createObjectNode(); + + resolveFieldValue("/comment/body", commentBody, baseTicketNode, record); + + if (subject != null) { + resolveFieldValue("/subject", subject, baseTicketNode, record); + } + + if (priority != null) { + resolveFieldValue("/priority", priority, baseTicketNode, record); + } + if (type != null) { + resolveFieldValue("/type", type, baseTicketNode, record); + } + + for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { + addDynamicField(dynamicProperty.getKey(), dynamicProperty.getValue(), baseTicketNode, record); + } + + ObjectNode ticketNode = recordCache.getIfPresent(baseTicketNode.toString()); + if (ticketNode == null) { + recordCache.put(baseTicketNode.toString(), baseTicketNode); + zendeskTickets.add(baseTicketNode); + writer.write(record); + writer.flush(); + } + } + writeResult = writer.finishRecordSet(); + writer.flush(); + } + } catch (SchemaNotFoundException e) { + final String errorMessage = format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s", + recordSet.getSchema().getSchemaName()); + throw new ProcessException(errorMessage, e); + } + + if (zendeskTickets.size() > 0) { Review Comment: Collection.isEmpty can be used here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
