Hi, Actually a callback handler for corrective action in-case of server side failures is available in 0.8.2, - https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
Thanks, Dev On Fri, Jul 1, 2016 at 12:48 PM, ASF GitHub Bot (JIRA) <j...@apache.org> wrote: > > [ > https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359534#comment-15359534 > ] > > ASF GitHub Bot commented on APEXMALHAR-2086: > -------------------------------------------- > > Github user sandeshh commented on a diff in the pull request: > > https://github.com/apache/apex-malhar/pull/298#discussion_r69347651 > > --- Diff: > kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java > --- > @@ -0,0 +1,401 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > + > +package org.apache.apex.malhar.kafka; > + > +import java.io.IOException; > +import java.util.ArrayList; > +import java.util.HashMap; > +import java.util.List; > +import java.util.Map; > +import java.util.Properties; > +import java.util.concurrent.ExecutionException; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; > +import org.apache.apex.malhar.lib.wal.WindowDataManager; > + > +import org.apache.kafka.clients.consumer.ConsumerRecord; > +import org.apache.kafka.clients.consumer.ConsumerRecords; > +import org.apache.kafka.clients.consumer.KafkaConsumer; > +import org.apache.kafka.clients.producer.Callback; > +import org.apache.kafka.clients.producer.ProducerRecord; > +import org.apache.kafka.clients.producer.RecordMetadata; > + > +import org.apache.kafka.common.PartitionInfo; > +import org.apache.kafka.common.TopicPartition; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.api.DefaultInputPort; > +import com.datatorrent.api.Operator; > + > +import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > +import static > org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; > +import static > org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; > + > +/** > + * Kafka output operator with exactly once processing semantics. > --- End diff -- > > @tweise & @siyuanh updated the JavaDoc. Please take a look. > > > > Kafka Output Operator with Kafka 0.9 API > > ---------------------------------------- > > > > Key: APEXMALHAR-2086 > > URL: > https://issues.apache.org/jira/browse/APEXMALHAR-2086 > > Project: Apache Apex Malhar > > Issue Type: New Feature > > Reporter: Sandesh > > Assignee: Sandesh > > > > Goal : 2 Operartors for Kafka Output > > 1. Simple Kafka Output Operator > > - Supports Atleast Once > > - Expose most used producer properties as class properties > > 2. Exactly Once Kafka Output ( Not possible in all the cases, will > be documented later ) > > > > Design for Exactly Once > > Window Data Manager - Stores the Kafka partitions offsets. > > Kafka Key - Used by the operator = AppID#OperatorId > > During recovery. Partially written window is re-created using the > following approach: > > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > > Only tuples which are not in the recovered set are emitted. > > Tuples needs to be unique within the window. > > > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >