[
https://issues.apache.org/jira/browse/NIFI-865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15128577#comment-15128577
]
ASF GitHub Bot commented on NIFI-865:
-------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/200#discussion_r51599615
--- Diff:
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+/**
+ * Publishing AMQP processor which upon each invocation of
+ * {@link #onTrigger(ProcessContext, ProcessSession)} method will
construct an
+ * AMQP message sending it to an exchange identified during construction
of this
+ * class while transferring the incoming {@link FlowFile} to 'success'
+ * {@link Relationship}.
+ *
+ * Expects that queues, exchanges and bindings are pre-defined by an AMQP
+ * administrator
+ */
+@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Creates a AMQP Message from the contents of a
FlowFile and sends the message to an AMQP Server")
+public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
+
+ public static final PropertyDescriptor EXCHANGE = new
PropertyDescriptor.Builder()
+ .name("Exchange Name")
+ .description("The name of the AMQP Exchange the messages will
be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
+ + "It is an optional property. If kept empty the
messages will be sent to a default AMQP exchange.")
+ .required(true)
+ .defaultValue("")
+ .addValidator(Validator.VALID)
+ .build();
+ public static final PropertyDescriptor ROUNTING_KEY = new
PropertyDescriptor.Builder()
+ .name("Routing Key")
+ .description("The name of the Routing Key that will be used by
AMQP to route messages from the exchange to a destination queue(s). "
+ + "Usually provided by the administrator (e.g.,
'myKey')In the event when messages are sent to a default exchange this property
"
+ + "corresponds to a destination queue name, otherwise
a binding from the Exchange to a Queue via Routing Key must be set "
+ + "(usually by the AMQP administrator)")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are sent to the AMQP
destination are routed to this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("All FlowFiles that cannot be routed to the AMQP
destination are routed to this relationship")
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ private final static Set<Relationship> relationships;
+
+ private final static List<String> amqpPropertyNames =
AMQPUtils.getAmqpPropertyNames();
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(EXCHANGE);
+ _propertyDescriptors.add(ROUNTING_KEY);
+ _propertyDescriptors.addAll(descriptors);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ /**
+ * Will construct AMQP message by extracting its body from the incoming
+ * {@link FlowFile}. AMQP {@link Properties} will be extracted from the
+ * {@link FlowFile} and converted to {@link BasicProperties} to be sent
+ * along with the message. Upon success the incoming {@link FlowFile}
is
+ * transfered to 'success' {@link Relationship} and upon failure
FlowFile is
+ * penalized and transfered to the 'failure' {@link Relationship}
+ * <br>
+ * NOTE: Attributes extracted from {@link FlowFile} are considered
+ * candidates for AMQP properties if their names are prefixed with
+ * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
+ *
+ */
+ @Override
+ protected void rendezvousWithAmqp(ProcessContext context,
ProcessSession processSession) throws ProcessException {
+ FlowFile flowFile = processSession.get();
+ if (flowFile != null) {
+ BasicProperties amqpProperties =
this.extractAmqpPropertiesFromFlowFile(flowFile);
+
+ byte[] messageContent = this.extractMessage(flowFile,
processSession);
+
+ try {
+ this.targetResource.publish(messageContent,
amqpProperties);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ processSession.transfer(processSession.penalize(flowFile),
REL_FAILURE);
+ processSession.getProvenanceReporter().receive(flowFile,
+ this.amqpConnection.toString() + "/E:" +
context.getProperty(EXCHANGE).getValue() + "/RK:"
+ +
context.getProperty(ROUNTING_KEY).getValue());
+ this.getLogger().error("Failed while sending message to
AMQP via " + this.targetResource, e);
+ context.yield();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ /**
+ *
+ */
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ /**
+ * Will create an instance of {@link AMQPPublisher}
+ */
+ @Override
+ protected AMQPPublisher finishBuildingTargetResource(ProcessContext
context) {
+ String exchangeName = context.getProperty(EXCHANGE).getValue();
+ String routingKey = context.getProperty(ROUNTING_KEY).getValue();
+ return new AMQPPublisher(this.amqpConnection, exchangeName,
routingKey, this.getLogger());
+ }
+
+ /**
+ * Extracts contents of the {@link FlowFile} as byte array.
+ */
+ private byte[] extractMessage(FlowFile flowFile, ProcessSession
session){
+ final byte[] messageContent = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, messageContent, true);
+ }
+ });
+ return messageContent;
+ }
+
+ /**
+ * Extracts AMQP properties from the {@link FlowFile} attributes.
Attributes
+ * extracted from {@link FlowFile} are considered candidates for AMQP
+ * properties if their names are prefixed with
+ * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
+ */
+ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile) {
+ Map<String, String> attributes = flowFile.getAttributes();
+ AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties.Builder();
+ for (Entry<String, String> attributeEntry : attributes.entrySet())
{
+ if
(attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
+ String amqpPropName = attributeEntry.getKey().split("\\" +
AMQPUtils.AMQP_PROP_DELIMITER)[1];
+ String amqpPropValue = attributeEntry.getValue();
+ System.out.println(amqpPropertyNames);
+ try {
+ if
(amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
+ Method m =
builder.getClass().getDeclaredMethod(amqpPropName, String.class);
+ m.invoke(builder, amqpPropValue);
+ } else {
+ getLogger().warn("Unrecogninsed AMQP property '" +
amqpPropName + "', will ignore.");
+ }
+ } catch (Exception e) {
+ // should really never happen since it should be
caught by
+ // the above IF.
+ getLogger().warn("Failed while tryinhg to build AMQP
Properties.", e);
--- End diff --
Typo in log message - tryinhg instead of trying
> Add processors to Get and Put to/from AMQP-based messaging systems
> ------------------------------------------------------------------
>
> Key: NIFI-865
> URL: https://issues.apache.org/jira/browse/NIFI-865
> Project: Apache NiFi
> Issue Type: Wish
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Oleg Zhurakousky
> Labels: beginner
> Fix For: 0.5.0
>
>
> David Smith in the mailing list has already begun work on this. The link to
> his current set of code in Github is here:
> https://github.com/helicopterman22/nifi_amqp_processors
> He is seeking help with licensing/unit testing/etc.. so that it can end up in
> a build.
> We have word from another user now that they too are interested in this.
> They specifically are using RabbitMQ but we should be able to implement
> (ideally) a pure AMQP protocol friendly version and be good to go.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)