Repository: flink
Updated Branches:
  refs/heads/master ac43a69c1 -> 033409190


[FLINK-2004] Fix memory leak in presence of failed checkpoints in Kafka source

This closes #674


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d294735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d294735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d294735

Branch: refs/heads/master
Commit: 7d294735eb3a77debbe2215c44dea4f97bcf7165
Parents: ac43a69
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu May 14 11:45:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jun 1 17:00:02 2015 +0200

----------------------------------------------------------------------
 .../api/persistent/PersistentKafkaSource.java   | 77 +++++++++++++-----
 .../streaming/connectors/kafka/KafkaITCase.java | 86 +++++++++++++++++++-
 .../streaming/api/checkpoint/Checkpointed.java  |  4 +-
 3 files changed, 140 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 032ed08..84fd7b6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -30,6 +30,7 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
@@ -50,7 +51,6 @@ import java.io.ObjectOutputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -65,20 +65,26 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                ResultTypeQueryable<OUT>,
                CheckpointCommitter,
                CheckpointedAsynchronously<long[]> {
+
+       private static final long serialVersionUID = 287845877188312621L;
+       
        private static final Logger LOG = 
LoggerFactory.getLogger(PersistentKafkaSource.class);
 
+       
+       private final String topicName;
+       private final DeserializationSchema<OUT> deserializationSchema;
+       
        protected transient ConsumerConfig consumerConfig;
        private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
        private transient ConsumerConnector consumer;
-
-       private String topicName;
-       private DeserializationSchema<OUT> deserializationSchema;
-       private boolean running = true;
-
-       private transient long[] lastOffsets;
+       
        private transient ZkClient zkClient;
+       private transient long[] lastOffsets;
        private transient long[] commitedOffsets; // maintain committed 
offsets, to avoid committing the same over and over again.
-
+       private transient long[] restoreState;
+       
+       private final LinkedMap pendingCheckpoints = new LinkedMap();
+       
        // We set this in reachedEnd to carry it over to next()
        private OUT nextElement = null;
 
@@ -133,19 +139,33 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                this.consumer = consumer;
 
                zkClient = new ZkClient(consumerConfig.zkConnect(),
-                       consumerConfig.zkSessionTimeoutMs(),
-                       consumerConfig.zkConnectionTimeoutMs(),
-                       new KafkaZKStringSerializer());
+                               consumerConfig.zkSessionTimeoutMs(),
+                               consumerConfig.zkConnectionTimeoutMs(),
+                               new KafkaZKStringSerializer());
 
                // most likely the number of offsets we're going to store here 
will be lower than the number of partitions.
                int numPartitions = getNumberOfPartitions();
                LOG.debug("The topic {} has {} partitions", topicName, 
numPartitions);
                this.lastOffsets = new long[numPartitions];
                this.commitedOffsets = new long[numPartitions];
-               Arrays.fill(this.lastOffsets, -1);
-               Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+               // check if there are offsets to restore
+               if (restoreState != null) {
+                       if (restoreState.length != numPartitions) {
+                               throw new IllegalStateException("There are 
"+restoreState.length+" offsets to restore for topic "+topicName+" but " +
+                                               "there are only 
"+numPartitions+" in the topic");
+                       }
 
+                       LOG.info("Setting restored offsets {} in ZooKeeper", 
Arrays.toString(restoreState));
+                       setOffsetsInZooKeeper(restoreState);
+                       this.lastOffsets = restoreState;
+               } else {
+                       // initialize empty offsets
+                       Arrays.fill(this.lastOffsets, -1);
+               }
+               Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+               
                nextElement = null;
+               pendingCheckpoints.clear();
        }
 
        @Override
@@ -200,10 +220,9 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
        // ---------------------- State / Checkpoint handling  -----------------
        // this source is keeping the partition offsets in Zookeeper
 
-       private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, 
long[]>();
-
        @Override
        public long[] snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+
                if(lastOffsets == null) {
                        LOG.warn("State snapshot requested on not yet opened 
source. Returning null");
                        return null;
@@ -217,7 +236,8 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
 
        @Override
        public void restoreState(long[] state) {
-               // we maintain the offsets in Kafka, so nothing to do.
+               LOG.info("The state will be restored to {} in the open() 
method", Arrays.toString(state));
+               this.restoreState = Arrays.copyOf(state, state.length);
        }
 
 
@@ -228,15 +248,28 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
        @Override
        public void commitCheckpoint(long checkpointId) {
                LOG.info("Commit checkpoint {}", checkpointId);
-               long[] checkpointOffsets = 
pendingCheckpoints.remove(checkpointId);
-               if(checkpointOffsets == null) {
+               final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+               if(posInMap == -1) {
                        LOG.warn("Unable to find pending checkpoint for id {}", 
checkpointId);
                        return;
                }
-               LOG.info("Got corresponding offsets {}", 
Arrays.toString(checkpointOffsets));
 
-               for(int partition = 0; partition < checkpointOffsets.length; 
partition++) {
-                       long offset = checkpointOffsets[partition];
+               long[] checkpointOffsets = (long[]) 
pendingCheckpoints.remove(posInMap);
+               LOG.info("Committing offsets {} to ZooKeeper", 
Arrays.toString(checkpointOffsets));
+
+               // remove older checkpoints in map:
+               if(!pendingCheckpoints.isEmpty()) {
+                       for(int i = 0; i < posInMap; i++) {
+                               pendingCheckpoints.remove(0);
+                       }
+               }
+
+               setOffsetsInZooKeeper(checkpointOffsets);
+       }
+
+       private void setOffsetsInZooKeeper(long[] offsets) {
+               for(int partition = 0; partition < offsets.length; partition++) 
{
+                       long offset = offsets[partition];
                        if(offset != -1) {
                                setOffset(partition, offset);
                        }
@@ -335,4 +368,4 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                        }
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 7b7bdcc..51682ab 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -44,10 +44,12 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import kafka.network.SocketServer;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -181,6 +183,82 @@ public class KafkaITCase {
                zkClient.close();
        }
 
+       // --------------------------  test checkpointing 
------------------------
+       @Test
+       public void testCheckpointing() throws Exception {
+               createTestTopic("testCheckpointing", 1, 1);
+
+               Properties props = new Properties();
+               props.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               props.setProperty("group.id", "testCheckpointing");
+               props.setProperty("auto.commit.enable", "false");
+               ConsumerConfig cc = new ConsumerConfig(props);
+               PersistentKafkaSource<String> source = new 
PersistentKafkaSource<String>("testCheckpointing", new 
FakeDeserializationSchema(), cc);
+
+
+               Field pendingCheckpointsField = 
PersistentKafkaSource.class.getDeclaredField("pendingCheckpoints");
+               pendingCheckpointsField.setAccessible(true);
+               LinkedMap pendingCheckpoints = (LinkedMap) 
pendingCheckpointsField.get(source);
+
+
+               Assert.assertEquals(0, pendingCheckpoints.size());
+               // first restore
+               source.restoreState(new long[]{1337});
+               // then open
+               source.open(new Configuration());
+               long[] state1 = source.snapshotState(1, 15);
+               Assert.assertArrayEquals(new long[]{1337}, state1);
+               long[] state2 = source.snapshotState(2, 30);
+               Assert.assertArrayEquals(new long[]{1337}, state2);
+               Assert.assertEquals(2, pendingCheckpoints.size());
+
+               source.commitCheckpoint(1);
+               Assert.assertEquals(1, pendingCheckpoints.size());
+
+               source.commitCheckpoint(2);
+               Assert.assertEquals(0, pendingCheckpoints.size());
+
+               source.commitCheckpoint(666); // invalid checkpoint
+               Assert.assertEquals(0, pendingCheckpoints.size());
+
+               // create 500 snapshots
+               for(int i = 0; i < 500; i++) {
+                       source.snapshotState(i, 15 * i);
+               }
+               Assert.assertEquals(500, pendingCheckpoints.size());
+
+               // commit only the second last
+               source.commitCheckpoint(498);
+               Assert.assertEquals(1, pendingCheckpoints.size());
+
+               // access invalid checkpoint
+               source.commitCheckpoint(490);
+
+               // and the last
+               source.commitCheckpoint(499);
+               Assert.assertEquals(0, pendingCheckpoints.size());
+       }
+
+       private static class FakeDeserializationSchema implements 
DeserializationSchema<String> {
+
+               @Override
+               public String deserialize(byte[] message) {
+                       return null;
+               }
+
+               @Override
+               public boolean isEndOfStream(String nextElement) {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<String> getProducedType() {
+                       return null;
+               }
+       }
+
+       // ---------------------------------------------------------------
+
 
        @Test
        public void testOffsetManipulation() {
@@ -234,9 +312,9 @@ public class KafkaITCase {
                long o1 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 0);
                long o2 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 1);
                long o3 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 2);
-               Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 
50L);
-               Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 
50L);
-               Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 
50L);
+               Assert.assertTrue("The offset seems incorrect, got " + o1, o1 > 
50L);
+               Assert.assertTrue("The offset seems incorrect, got " + o2, o2 > 
50L);
+               Assert.assertTrue("The offset seems incorrect, got " + o3, o3 > 
50L);
                /** Once we have proper shutdown of streaming jobs, enable 
these tests
                Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
                Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index 2cab7a3..cb49dba 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -56,7 +56,9 @@ public interface Checkpointed<T extends Serializable> {
        /**
         * Restores the state of the function or operator to that of a previous 
checkpoint.
         * This method is invoked when a function is executed as part of a 
recovery run.
-        *       * 
+        *
+        * Note that restoreState() is called before open().
+        *
         * @param state The state to be restored. 
         */
        void restoreState(T state);

Reply via email to