[ 
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248187#comment-15248187
 ] 

ASF GitHub Bot commented on NIFI-1296:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/345#discussion_r60269808
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 ---
    @@ -0,0 +1,197 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Consumes messages from Apache Kafka")
    +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" 
})
    +public class ConsumeKafka extends 
AbstractKafkaProcessor<KafkaConsumer<byte[], byte[]>> {
    +
    +    static final AllowableValue OFFSET_EARLIEST = new 
AllowableValue("earliest", "earliest", "Automatically reset the offset to the 
earliest offset");
    +
    +    static final AllowableValue OFFSET_LATEST = new 
AllowableValue("latest", "latest", "Automatically reset the offset to the 
latest offset");
    +
    +    static final AllowableValue OFFSET_NONE = new AllowableValue("none", 
"none", "Throw exception to the consumer if no previous offset is found for the 
consumer's group");
    +
    +    static final PropertyDescriptor TOPIC = TOPIC_BUILDER
    +            .expressionLanguageSupported(false)
    --- End diff --
    
    Valid suggestion, but at this point it's all about consistency and symmetry 
to the existing Get/PutKafka and in PUT EL is enabled while in GET it is not


> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
>                 Key: NIFI-1296
>                 URL: https://issues.apache.org/jira/browse/NIFI-1296
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.4.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in 
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce 
> issues with running against older Kafka brokers (e.g., 0.8). Need to 
> investigate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to