Hi,

Actually a callback handler for corrective action in-case of server side
failures is available in 0.8.2,
-
https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

Thanks,
Dev



On Fri, Jul 1, 2016 at 12:48 PM, ASF GitHub Bot (JIRA) <j...@apache.org>
wrote:

>
>     [
> https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359534#comment-15359534
> ]
>
> ASF GitHub Bot commented on APEXMALHAR-2086:
> --------------------------------------------
>
> Github user sandeshh commented on a diff in the pull request:
>
>     https://github.com/apache/apex-malhar/pull/298#discussion_r69347651
>
>     --- Diff:
> kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
> ---
>     @@ -0,0 +1,401 @@
>     +/**
>     + * Licensed to the Apache Software Foundation (ASF) under one
>     + * or more contributor license agreements.  See the NOTICE file
>     + * distributed with this work for additional information
>     + * regarding copyright ownership.  The ASF licenses this file
>     + * to you under the Apache License, Version 2.0 (the
>     + * "License"); you may not use this file except in compliance
>     + * with the License.  You may obtain a copy of the License at
>     + *
>     + *   http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing,
>     + * software distributed under the License is distributed on an
>     + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>     + * KIND, either express or implied.  See the License for the
>     + * specific language governing permissions and limitations
>     + * under the License.
>     + */
>     +
>     +package org.apache.apex.malhar.kafka;
>     +
>     +import java.io.IOException;
>     +import java.util.ArrayList;
>     +import java.util.HashMap;
>     +import java.util.List;
>     +import java.util.Map;
>     +import java.util.Properties;
>     +import java.util.concurrent.ExecutionException;
>     +
>     +import org.slf4j.Logger;
>     +import org.slf4j.LoggerFactory;
>     +
>     +import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
>     +import org.apache.apex.malhar.lib.wal.WindowDataManager;
>     +
>     +import org.apache.kafka.clients.consumer.ConsumerRecord;
>     +import org.apache.kafka.clients.consumer.ConsumerRecords;
>     +import org.apache.kafka.clients.consumer.KafkaConsumer;
>     +import org.apache.kafka.clients.producer.Callback;
>     +import org.apache.kafka.clients.producer.ProducerRecord;
>     +import org.apache.kafka.clients.producer.RecordMetadata;
>     +
>     +import org.apache.kafka.common.PartitionInfo;
>     +import org.apache.kafka.common.TopicPartition;
>     +
>     +import com.datatorrent.api.Context;
>     +import com.datatorrent.api.DefaultInputPort;
>     +import com.datatorrent.api.Operator;
>     +
>     +import static
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
>     +import static
> org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
>     +import static
> org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
>     +
>     +/**
>     + * Kafka output operator with exactly once processing semantics.
>     --- End diff --
>
>     @tweise & @siyuanh updated the JavaDoc. Please take a look.
>
>
> > Kafka Output Operator with Kafka 0.9 API
> > ----------------------------------------
> >
> >                 Key: APEXMALHAR-2086
> >                 URL:
> https://issues.apache.org/jira/browse/APEXMALHAR-2086
> >             Project: Apache Apex Malhar
> >          Issue Type: New Feature
> >            Reporter: Sandesh
> >            Assignee: Sandesh
> >
> > Goal : 2 Operartors for Kafka Output
> >       1. Simple Kafka Output Operator
> >             - Supports Atleast Once
> >             - Expose most used producer properties as class properties
> >       2. Exactly Once Kafka Output ( Not possible in all the cases, will
> be documented later )
> >
> > Design for Exactly Once
> > Window Data Manager - Stores the Kafka partitions offsets.
> > Kafka Key - Used by the operator = AppID#OperatorId
> > During recovery. Partially written window is re-created using the
> following  approach:
> > Tuples between the largest recovery offsets and the current offset are
> checked. Based on the key, tuples written by the other entities are
> discarded.
> > Only tuples which are not in the recovered set are emitted.
> > Tuples needs to be unique within the window.
> >
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>

Reply via email to