[
https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15016683#comment-15016683
]
ASF GitHub Bot commented on FLINK-2974:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1341#discussion_r45462208
--- Diff:
flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
---
@@ -26,24 +26,42 @@
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-public class MockRuntimeContext implements RuntimeContext {
+public class MockRuntimeContext extends StreamingRuntimeContext {
private final int numberOfParallelSubtasks;
private final int indexOfThisSubtask;
public MockRuntimeContext(int numberOfParallelSubtasks, int
indexOfThisSubtask) {
+ super(new MockStreamOperator(),
+ new MockEnvironment("no", 4 *
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+ Collections.<String, Accumulator<?,
?>>emptyMap());
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.indexOfThisSubtask = indexOfThisSubtask;
}
+ public static class MockStreamOperator extends AbstractStreamOperator {
--- End diff --
Can probably be private, no one else uses this, if I see it correctly...
> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
> Key: FLINK-2974
> URL: https://issues.apache.org/jira/browse/FLINK-2974
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
> Assignee: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is
> enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user:
> http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)