[ 
https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15016683#comment-15016683
 ] 

ASF GitHub Bot commented on FLINK-2974:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45462208
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 ---
    @@ -26,24 +26,42 @@
     import org.apache.flink.api.common.accumulators.LongCounter;
     import org.apache.flink.api.common.cache.DistributedCache;
     import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
    -import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.api.common.state.OperatorState;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
     
     import java.io.Serializable;
    +import java.util.Collections;
     import java.util.List;
     import java.util.Map;
     
    -public class MockRuntimeContext implements RuntimeContext {
    +public class MockRuntimeContext extends StreamingRuntimeContext {
     
        private final int numberOfParallelSubtasks;
        private final int indexOfThisSubtask;
     
        public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
    +           super(new MockStreamOperator(),
    +                           new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
    +                           Collections.<String, Accumulator<?, 
?>>emptyMap());
                this.numberOfParallelSubtasks = numberOfParallelSubtasks;
                this.indexOfThisSubtask = indexOfThisSubtask;
        }
     
    +   public static class MockStreamOperator extends AbstractStreamOperator {
    --- End diff --
    
    Can probably be private, no one else uses this, if I see it correctly...


> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
>                 Key: FLINK-2974
>                 URL: https://issues.apache.org/jira/browse/FLINK-2974
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is 
> enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user: 
> http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to