[ 
https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693595#comment-16693595
 ] 

ASF GitHub Bot commented on NIFI-4914:
--------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3178#discussion_r235108402
  
    --- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java
 ---
    @@ -0,0 +1,324 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar.pubsub;
    +
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +
    +@CapabilityDescription("Consumes messages from Apache Pulsar. "
    +        + "The complementary NiFi processor for sending messages is 
PublishPulsarRecord. Please note that, at this time, "
    +        + "the Processor assumes that all records that are retrieved have 
the same schema. If any of the Pulsar messages "
    +        + "that are pulled but cannot be parsed or written with the 
configured Record Reader or Record Writer, the contents "
    +        + "of the message will be written to a separate FlowFile, and that 
FlowFile will be transferred to the 'parse.failure' "
    +        + "relationship. Otherwise, each FlowFile is sent to the 'success' 
relationship and may contain many individual "
    +        + "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
    +        + "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
    +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The number 
of records received")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({PublishPulsar.class, ConsumePulsar.class, 
PublishPulsarRecord.class})
    +public class ConsumePulsarRecord extends 
AbstractPulsarConsumerProcessor<byte[]> {
    +
    +    public static final String MSG_COUNT = "record.count";
    +
    +    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("Record Reader")
    +            .displayName("Record Reader")
    +            .description("The Record Reader to use for incoming FlowFiles")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("Record Writer")
    +            .displayName("Record Writer")
    +            .description("The Record Writer to use in order to serialize 
the data before sending to Pulsar")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The number of records to combine into a single 
flow file.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_WAIT_TIME = new 
PropertyDescriptor.Builder()
    +            .name("Max Wait Time")
    +            .description("The maximum amount of time allowed for a Pulsar 
consumer to poll a subscription for data "
    +                    + ", zero means there is no limit. Max time less than 
1 second will be equal to zero.")
    +            .defaultValue("2 seconds")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
    +            .name("parse_failure")
    +            .description("FlowFiles for which the content cannot be 
parsed.")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES;
    +    private static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(BATCH_SIZE);
    +        properties.add(MAX_WAIT_TIME);
    +        properties.addAll(AbstractPulsarConsumerProcessor.PROPERTIES);
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_PARSE_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER)
    +                .asControllerService(RecordReaderFactory.class);
    +
    +        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER)
    +                .asControllerService(RecordSetWriterFactory.class);
    +
    +        try {
    +            Consumer<byte[]> consumer = getConsumer(context, 
getConsumerId(context, session.get()));
    +
    +            if (consumer == null) { /* If we aren't connected to Pulsar, 
then just yield */
    +                context.yield();
    +                return;
    +            }
    +
    +            if (context.getProperty(ASYNC_ENABLED).isSet() && 
context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +               consumeAsync(consumer, context, session);
    +               handleAsync(context, session, consumer, readerFactory, 
writerFactory);
    +            } else {
    +               consumeMessage(session, consumer, consumer.receive(), 
readerFactory, writerFactory);
    +            }
    +        } catch (PulsarClientException e) {
    +            getLogger().error("Unable to consume from Pulsar Topic ", e);
    +            context.yield();
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    private void consumeMessage(ProcessSession session, final 
Consumer<byte[]> consumer, final Message<byte[]> msg,
    +            final RecordReaderFactory readerFactory, 
RecordSetWriterFactory writerFactory) throws PulsarClientException {
    +        consumeMessage(session, consumer, msg, false, readerFactory, 
writerFactory);
    +    }
    +
    +    /**
    +     * Consumes a single message, parses it, and returns.
    +     */
    +    private void consumeMessage(ProcessSession session, final 
Consumer<byte[]> consumer, final Message<byte[]> msg, boolean async,
    +         final RecordReaderFactory readerFactory, RecordSetWriterFactory 
writerFactory) throws PulsarClientException {
    +
    +       RecordSetWriter writer = null;
    +       FlowFile flowFile = session.create();
    +       OutputStream rawOut = null;
    +       RecordReader reader = getRecordReader(msg, readerFactory, session);
    +       Record firstRecord = getFirstRecord(msg, reader, session);
    +
    +       // We have a message that we cannot parse.
    +       if (firstRecord == null) {
    +            if (!async) {
    +              consumer.acknowledge(msg);
    +            } else {
    +             getAckService().submit(new Callable<Object>() {
    +                 @Override
    +                 public Object call() throws Exception {
    +                    return consumer.acknowledgeAsync(msg).get();
    --- End diff --
    
    are we only pulling a single message from pulsar each time?  We should pull 
a ton of them...and write out the records..  It looks liket his is a single 
pulsar message but then we try to extract multiple records from it.  That is a 
fine path but the performant path will be to have many messages turned into a 
large set of records that go in a single flow file and repeat.


> Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, 
> PublishPulsarRecord
> ------------------------------------------------------------------------------------------
>
>                 Key: NIFI-4914
>                 URL: https://issues.apache.org/jira/browse/NIFI-4914
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>            Reporter: David Kjerrumgaard
>            Priority: Minor
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Create record-based processors for Apache Pulsar 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to