Repository: flink Updated Branches: refs/heads/master ac43a69c1 -> 033409190
[FLINK-2004] Fix memory leak in presence of failed checkpoints in Kafka source This closes #674 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d294735 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d294735 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d294735 Branch: refs/heads/master Commit: 7d294735eb3a77debbe2215c44dea4f97bcf7165 Parents: ac43a69 Author: Robert Metzger <rmetz...@apache.org> Authored: Thu May 14 11:45:30 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Jun 1 17:00:02 2015 +0200 ---------------------------------------------------------------------- .../api/persistent/PersistentKafkaSource.java | 77 +++++++++++++----- .../streaming/connectors/kafka/KafkaITCase.java | 86 +++++++++++++++++++- .../streaming/api/checkpoint/Checkpointed.java | 4 +- 3 files changed, 140 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index 032ed08..84fd7b6 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -30,6 +30,7 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -50,7 +51,6 @@ import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -65,20 +65,26 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> ResultTypeQueryable<OUT>, CheckpointCommitter, CheckpointedAsynchronously<long[]> { + + private static final long serialVersionUID = 287845877188312621L; + private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class); + + private final String topicName; + private final DeserializationSchema<OUT> deserializationSchema; + protected transient ConsumerConfig consumerConfig; private transient ConsumerIterator<byte[], byte[]> iteratorToRead; private transient ConsumerConnector consumer; - - private String topicName; - private DeserializationSchema<OUT> deserializationSchema; - private boolean running = true; - - private transient long[] lastOffsets; + private transient ZkClient zkClient; + private transient long[] lastOffsets; private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again. - + private transient long[] restoreState; + + private final LinkedMap pendingCheckpoints = new LinkedMap(); + // We set this in reachedEnd to carry it over to next() private OUT nextElement = null; @@ -133,19 +139,33 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> this.consumer = consumer; zkClient = new ZkClient(consumerConfig.zkConnect(), - consumerConfig.zkSessionTimeoutMs(), - consumerConfig.zkConnectionTimeoutMs(), - new KafkaZKStringSerializer()); + consumerConfig.zkSessionTimeoutMs(), + consumerConfig.zkConnectionTimeoutMs(), + new KafkaZKStringSerializer()); // most likely the number of offsets we're going to store here will be lower than the number of partitions. int numPartitions = getNumberOfPartitions(); LOG.debug("The topic {} has {} partitions", topicName, numPartitions); this.lastOffsets = new long[numPartitions]; this.commitedOffsets = new long[numPartitions]; - Arrays.fill(this.lastOffsets, -1); - Arrays.fill(this.commitedOffsets, 0); // just to make it clear + // check if there are offsets to restore + if (restoreState != null) { + if (restoreState.length != numPartitions) { + throw new IllegalStateException("There are "+restoreState.length+" offsets to restore for topic "+topicName+" but " + + "there are only "+numPartitions+" in the topic"); + } + LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(restoreState)); + setOffsetsInZooKeeper(restoreState); + this.lastOffsets = restoreState; + } else { + // initialize empty offsets + Arrays.fill(this.lastOffsets, -1); + } + Arrays.fill(this.commitedOffsets, 0); // just to make it clear + nextElement = null; + pendingCheckpoints.clear(); } @Override @@ -200,10 +220,9 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> // ---------------------- State / Checkpoint handling ----------------- // this source is keeping the partition offsets in Zookeeper - private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>(); - @Override public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + if(lastOffsets == null) { LOG.warn("State snapshot requested on not yet opened source. Returning null"); return null; @@ -217,7 +236,8 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> @Override public void restoreState(long[] state) { - // we maintain the offsets in Kafka, so nothing to do. + LOG.info("The state will be restored to {} in the open() method", Arrays.toString(state)); + this.restoreState = Arrays.copyOf(state, state.length); } @@ -228,15 +248,28 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> @Override public void commitCheckpoint(long checkpointId) { LOG.info("Commit checkpoint {}", checkpointId); - long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId); - if(checkpointOffsets == null) { + final int posInMap = pendingCheckpoints.indexOf(checkpointId); + if(posInMap == -1) { LOG.warn("Unable to find pending checkpoint for id {}", checkpointId); return; } - LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets)); - for(int partition = 0; partition < checkpointOffsets.length; partition++) { - long offset = checkpointOffsets[partition]; + long[] checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap); + LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets)); + + // remove older checkpoints in map: + if(!pendingCheckpoints.isEmpty()) { + for(int i = 0; i < posInMap; i++) { + pendingCheckpoints.remove(0); + } + } + + setOffsetsInZooKeeper(checkpointOffsets); + } + + private void setOffsetsInZooKeeper(long[] offsets) { + for(int partition = 0; partition < offsets.length; partition++) { + long offset = offsets[partition]; if(offset != -1) { setOffset(partition, offset); } @@ -335,4 +368,4 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 7b7bdcc..51682ab 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.connectors.kafka; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -44,10 +44,12 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.network.SocketServer; import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.collections.map.LinkedMap; import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; @@ -181,6 +183,82 @@ public class KafkaITCase { zkClient.close(); } + // -------------------------- test checkpointing ------------------------ + @Test + public void testCheckpointing() throws Exception { + createTestTopic("testCheckpointing", 1, 1); + + Properties props = new Properties(); + props.setProperty("zookeeper.connect", zookeeperConnectionString); + props.setProperty("group.id", "testCheckpointing"); + props.setProperty("auto.commit.enable", "false"); + ConsumerConfig cc = new ConsumerConfig(props); + PersistentKafkaSource<String> source = new PersistentKafkaSource<String>("testCheckpointing", new FakeDeserializationSchema(), cc); + + + Field pendingCheckpointsField = PersistentKafkaSource.class.getDeclaredField("pendingCheckpoints"); + pendingCheckpointsField.setAccessible(true); + LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source); + + + Assert.assertEquals(0, pendingCheckpoints.size()); + // first restore + source.restoreState(new long[]{1337}); + // then open + source.open(new Configuration()); + long[] state1 = source.snapshotState(1, 15); + Assert.assertArrayEquals(new long[]{1337}, state1); + long[] state2 = source.snapshotState(2, 30); + Assert.assertArrayEquals(new long[]{1337}, state2); + Assert.assertEquals(2, pendingCheckpoints.size()); + + source.commitCheckpoint(1); + Assert.assertEquals(1, pendingCheckpoints.size()); + + source.commitCheckpoint(2); + Assert.assertEquals(0, pendingCheckpoints.size()); + + source.commitCheckpoint(666); // invalid checkpoint + Assert.assertEquals(0, pendingCheckpoints.size()); + + // create 500 snapshots + for(int i = 0; i < 500; i++) { + source.snapshotState(i, 15 * i); + } + Assert.assertEquals(500, pendingCheckpoints.size()); + + // commit only the second last + source.commitCheckpoint(498); + Assert.assertEquals(1, pendingCheckpoints.size()); + + // access invalid checkpoint + source.commitCheckpoint(490); + + // and the last + source.commitCheckpoint(499); + Assert.assertEquals(0, pendingCheckpoints.size()); + } + + private static class FakeDeserializationSchema implements DeserializationSchema<String> { + + @Override + public String deserialize(byte[] message) { + return null; + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public TypeInformation<String> getProducedType() { + return null; + } + } + + // --------------------------------------------------------------- + @Test public void testOffsetManipulation() { @@ -234,9 +312,9 @@ public class KafkaITCase { long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0); long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1); long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2); - Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L); - Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L); - Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L); + Assert.assertTrue("The offset seems incorrect, got " + o1, o1 > 50L); + Assert.assertTrue("The offset seems incorrect, got " + o2, o2 > 50L); + Assert.assertTrue("The offset seems incorrect, got " + o3, o3 > 50L); /** Once we have proper shutdown of streaming jobs, enable these tests Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0)); Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1)); http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java index 2cab7a3..cb49dba 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java @@ -56,7 +56,9 @@ public interface Checkpointed<T extends Serializable> { /** * Restores the state of the function or operator to that of a previous checkpoint. * This method is invoked when a function is executed as part of a recovery run. - * * + * + * Note that restoreState() is called before open(). + * * @param state The state to be restored. */ void restoreState(T state);