[ 
https://issues.apache.org/jira/browse/NIFI-865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15128586#comment-15128586
 ] 

ASF GitHub Bot commented on NIFI-865:
-------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/200#discussion_r51600206
  
    --- Diff: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
 ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.amqp.processors;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.rabbitmq.client.AMQP;
    +import com.rabbitmq.client.AMQP.BasicProperties;
    +
    +/**
    + * Publishing AMQP processor which upon each invocation of
    + * {@link #onTrigger(ProcessContext, ProcessSession)} method will 
construct an
    + * AMQP message sending it to an exchange identified during construction 
of this
    + * class while transferring the incoming {@link FlowFile} to 'success'
    + * {@link Relationship}.
    + *
    + * Expects that queues, exchanges and bindings are pre-defined by an AMQP
    + * administrator
    + */
    +@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Creates a AMQP Message from the contents of a 
FlowFile and sends the message to an AMQP Server")
    +public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
    +
    +    public static final PropertyDescriptor EXCHANGE = new 
PropertyDescriptor.Builder()
    +            .name("Exchange Name")
    +            .description("The name of the AMQP Exchange the messages will 
be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
    +                    + "It is an optional property. If kept empty the 
messages will be sent to a default AMQP exchange.")
    +            .required(true)
    +            .defaultValue("")
    +            .addValidator(Validator.VALID)
    +            .build();
    +    public static final PropertyDescriptor ROUNTING_KEY = new 
PropertyDescriptor.Builder()
    +            .name("Routing Key")
    +            .description("The name of the Routing Key that will be used by 
AMQP to route messages from the exchange to a destination queue(s). "
    +                    + "Usually provided by the administrator (e.g., 
'myKey')In the event when messages are sent to a default exchange this property 
"
    +                    + "corresponds to a destination queue name, otherwise 
a binding from the Exchange to a Queue via Routing Key must be set "
    +                    + "(usually by the AMQP administrator)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are sent to the AMQP 
destination are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("All FlowFiles that cannot be routed to the AMQP 
destination are routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    private final static Set<Relationship> relationships;
    +
    +    private final static List<String> amqpPropertyNames = 
AMQPUtils.getAmqpPropertyNames();
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only 
once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(EXCHANGE);
    +        _propertyDescriptors.add(ROUNTING_KEY);
    +        _propertyDescriptors.addAll(descriptors);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     * Will construct AMQP message by extracting its body from the incoming
    +     * {@link FlowFile}. AMQP {@link Properties} will be extracted from the
    +     * {@link FlowFile} and converted to {@link BasicProperties} to be sent
    +     * along with the message. Upon success the incoming {@link FlowFile} 
is
    +     * transfered to 'success' {@link Relationship} and upon failure 
FlowFile is
    +     * penalized and transfered to the 'failure' {@link Relationship}
    +     * <br>
    +     * NOTE: Attributes extracted from {@link FlowFile} are considered
    +     * candidates for AMQP properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithAmqp(ProcessContext context, 
ProcessSession processSession) throws ProcessException {
    +        FlowFile flowFile = processSession.get();
    +        if (flowFile != null) {
    +            BasicProperties amqpProperties = 
this.extractAmqpPropertiesFromFlowFile(flowFile);
    +
    +            byte[] messageContent = this.extractMessage(flowFile, 
processSession);
    +
    +            try {
    +                this.targetResource.publish(messageContent, 
amqpProperties);
    +                processSession.transfer(flowFile, REL_SUCCESS);
    +            } catch (Exception e) {
    +                processSession.transfer(processSession.penalize(flowFile), 
REL_FAILURE);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        this.amqpConnection.toString() + "/E:" + 
context.getProperty(EXCHANGE).getValue() + "/RK:"
    +                                + 
context.getProperty(ROUNTING_KEY).getValue());
    +                this.getLogger().error("Failed while sending message to 
AMQP via " + this.targetResource, e);
    +                context.yield();
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     * Will create an instance of {@link AMQPPublisher}
    +     */
    +    @Override
    +    protected AMQPPublisher finishBuildingTargetResource(ProcessContext 
context) {
    +        String exchangeName = context.getProperty(EXCHANGE).getValue();
    +        String routingKey = context.getProperty(ROUNTING_KEY).getValue();
    +        return new AMQPPublisher(this.amqpConnection, exchangeName, 
routingKey, this.getLogger());
    +    }
    +
    +    /**
    +     * Extracts contents of the {@link FlowFile} as byte array.
    +     */
    +    private byte[] extractMessage(FlowFile flowFile, ProcessSession 
session){
    +        final byte[] messageContent = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, messageContent, true);
    +            }
    +        });
    +        return messageContent;
    +    }
    +
    +    /**
    +     * Extracts AMQP properties from the {@link FlowFile} attributes. 
Attributes
    +     * extracted from {@link FlowFile} are considered candidates for AMQP
    +     * properties if their names are prefixed with
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     */
    +    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile) {
    +        Map<String, String> attributes = flowFile.getAttributes();
    +        AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
    +        for (Entry<String, String> attributeEntry : attributes.entrySet()) 
{
    +            if 
(attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
    +                String amqpPropName = attributeEntry.getKey().split("\\" + 
AMQPUtils.AMQP_PROP_DELIMITER)[1];
    +                String amqpPropValue = attributeEntry.getValue();
    +                System.out.println(amqpPropertyNames);
    --- End diff --
    
    oops


> Add processors to Get and Put to/from AMQP-based messaging systems
> ------------------------------------------------------------------
>
>                 Key: NIFI-865
>                 URL: https://issues.apache.org/jira/browse/NIFI-865
>             Project: Apache NiFi
>          Issue Type: Wish
>          Components: Extensions
>            Reporter: Joseph Witt
>            Assignee: Oleg Zhurakousky
>              Labels: beginner
>             Fix For: 0.5.0
>
>
> David Smith in the mailing list has already begun work on this.  The link to 
> his current set of code in Github is here:  
> https://github.com/helicopterman22/nifi_amqp_processors
> He is seeking help with licensing/unit testing/etc.. so that it can end up in 
> a build.
> We have word from another user now that they too are interested in this.  
> They specifically are using RabbitMQ but we should be able to implement 
> (ideally) a pure AMQP protocol friendly version and be good to go.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to