Repository: flink
Updated Branches:
  refs/heads/master 8d1efa045 -> 84b39dcb5


[FLINK-2008] [FLINK-2296] Fix checkpoint committing & KafkaITCase

This closes #895


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa5e5b30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa5e5b30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa5e5b30

Branch: refs/heads/master
Commit: aa5e5b3087a68f2aac792c0b0fc64b4f9c707e9b
Parents: 8d1efa0
Author: Robert Metzger <[email protected]>
Authored: Mon Jun 29 16:52:38 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Jul 13 17:54:30 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |   3 +
 .../checkpoint/CheckpointCoordinator.java       |  11 +-
 .../checkpoint/SuccessfulCheckpoint.java        |  22 +--
 .../runtime/executiongraph/ExecutionVertex.java |   4 +
 .../tasks/CheckpointCommittingOperator.java     |  27 ----
 .../tasks/CheckpointNotificationOperator.java   |  25 ++++
 .../messages/checkpoint/ConfirmCheckpoint.java  |  88 ------------
 .../checkpoint/NotifyCheckpointComplete.java    |  73 ++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  20 +--
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  18 +--
 .../messages/CheckpointMessagesTest.java        |   4 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  12 +-
 .../flink/streaming/connectors/kafka/Utils.java |  30 +++-
 .../api/persistent/PersistentKafkaSource.java   | 108 ++++++++++----
 .../streaming/connectors/kafka/KafkaITCase.java | 144 +++++++++++++++----
 .../connectors/kafka/util/UtilsTest.java        |  75 ++++++++++
 .../api/checkpoint/CheckpointCommitter.java     |  44 ------
 .../api/checkpoint/CheckpointNotifier.java      |  37 +++++
 .../api/graph/StreamingJobGraphGenerator.java   |   5 +-
 .../operators/AbstractUdfStreamOperator.java    |   9 +-
 .../api/operators/StatefulStreamOperator.java   |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  80 ++++-------
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../StreamCheckpointingITCase.java              |  14 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   6 +-
 26 files changed, 525 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index e337ea8..02da8cb 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1319,6 +1319,9 @@ Another way of exposing user defined operator state for 
the Flink runtime for ch
 
 When the user defined function implements the `Checkpointed` interface, the 
`snapshotState(…)` and `restoreState(…)` methods will be executed to draw 
and restore function state.
 
+In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on completed 
checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
+Note that there is no guarantee for the user function to receive a 
notification if a failure happens between checkpoint completion and 
notification. The notifications should hence be treated in a way that 
notifications from later checkpoints can subsume missing notifications.
+
 For example the same counting, reduce function shown for `OperatorState`s by 
using the `Checkpointed` interface instead:
 
 {% highlight java %}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 91fd424..2b2bf6b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -30,10 +30,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -375,11 +373,8 @@ public class CheckpointCoordinator {
                                Execution ee = ev.getCurrentExecutionAttempt();
                                if (ee != null) {
                                        ExecutionAttemptID attemptId = 
ee.getAttemptId();
-                                       StateForTask stateForTask = 
completed.getState(ev.getJobvertexId());
-                                       SerializedValue<StateHandle<?>> 
taskState = (stateForTask != null) ? stateForTask.getState() : null;
-                                       ConfirmCheckpoint confirmMessage = new 
ConfirmCheckpoint(job, attemptId, checkpointId, 
-                                                       timestamp, taskState);
-                                       
ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
+                                       NotifyCheckpointComplete notifyMessage 
= new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
+                                       
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index 5432d33..be0b301 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
@@ -41,9 +38,7 @@ public class SuccessfulCheckpoint {
        
        private final long timestamp;
        
-       private final Map<JobVertexID, StateForTask> vertexToState;
-       
-       private final List<StateForTask> states; 
+       private final List<StateForTask> states;
 
 
        public SuccessfulCheckpoint(JobID job, long checkpointID, long 
timestamp, List<StateForTask> states) {
@@ -51,10 +46,6 @@ public class SuccessfulCheckpoint {
                this.checkpointID = checkpointID;
                this.timestamp = timestamp;
                this.states = states;
-               vertexToState = Maps.newHashMap();
-               for(StateForTask state : states){
-                       vertexToState.put(state.getOperatorId(), state);
-               }
        }
 
        public JobID getJobId() {
@@ -73,17 +64,6 @@ public class SuccessfulCheckpoint {
                return states;
        }
 
-       /**
-        * Returns the task state included in the checkpoint for a given 
JobVertexID if it exists or 
-        * null if no state is included for that id.
-        * 
-        * @param jobVertexID
-        * @return
-        */
-       public StateForTask getState(JobVertexID jobVertexID) {
-               return vertexToState.get(jobVertexID);
-       }
-
        // 
--------------------------------------------------------------------------------------------
        
        public void discard(ClassLoader userClassLoader) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d44cb6a..a70fa7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -168,6 +168,10 @@ public class ExecutionVertex implements Serializable {
                                getTotalNumberOfParallelSubtasks());
        }
 
+       public int getSubTaskIndex() {
+               return subTaskIndex;
+       }
+
        public int getTotalNumberOfParallelSubtasks() {
                return this.jobVertex.getParallelism();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
deleted file mode 100644
index a6f9851..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
-
-public interface CheckpointCommittingOperator {
-       
-       void confirmCheckpoint(long checkpointId, 
SerializedValue<StateHandle<?>> state) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
new file mode 100644
index 0000000..90c82b7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+
+public interface CheckpointNotificationOperator {
+       
+       void notifyCheckpointComplete(long checkpointId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
deleted file mode 100644
index 328f692..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
-
-/**
- * This message is sent from the {@link 
org.apache.flink.runtime.jobmanager.JobManager} to the
- * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task 
that the checkpoint
- * has been confirmed and that the task can commit the checkpoint to the 
outside world.
- */
-public class ConfirmCheckpoint extends AbstractCheckpointMessage implements 
java.io.Serializable {
-
-       private static final long serialVersionUID = 2094094662279578953L;
-
-       /** The timestamp associated with the checkpoint */
-       private final long timestamp;
-
-       /** The stateHandle associated with the checkpoint confirmation 
message*/
-       private final SerializedValue<StateHandle<?>> state;
-       
-       public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, long timestamp, 
-               SerializedValue<StateHandle<?>> state) {
-               super(job, taskExecutionId, checkpointId);
-               this.timestamp = timestamp;
-               this.state = state;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public long getTimestamp() {
-               return timestamp;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Returns the stateHandle that was included in the confirmed 
checkpoint for a given task or null
-        * if no state was commited in that checkpoint.
-        */
-       public SerializedValue<StateHandle<?>> getState() {
-               return state;
-       }
-       
-       @Override
-       public int hashCode() {
-               return super.hashCode() + (int) (timestamp ^ (timestamp >>> 
32));
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               else if (o instanceof ConfirmCheckpoint) {
-                       ConfirmCheckpoint that = (ConfirmCheckpoint) o;
-                       return this.timestamp == that.timestamp && 
super.equals(o);
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public String toString() {
-               return String.format("ConfirmCheckpoint %d for (%s/%s)", 
-                               getCheckpointId(), getJob(), 
getTaskExecutionId());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java
new file mode 100644
index 0000000..c64c77a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link 
org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task 
that the checkpoint
+ * has been confirmed and that the task can commit the checkpoint to the 
outside world.
+ */
+public class NotifyCheckpointComplete extends AbstractCheckpointMessage 
implements java.io.Serializable {
+
+       private static final long serialVersionUID = 2094094662279578953L;
+
+       /** The timestamp associated with the checkpoint */
+       private final long timestamp;
+
+       public NotifyCheckpointComplete(JobID job, ExecutionAttemptID 
taskExecutionId, long checkpointId, long timestamp) {
+               super(job, taskExecutionId, checkpointId);
+               this.timestamp = timestamp;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public long getTimestamp() {
+               return timestamp;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return super.hashCode() + (int) (timestamp ^ (timestamp >>> 
32));
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               else if (o instanceof NotifyCheckpointComplete) {
+                       NotifyCheckpointComplete that = 
(NotifyCheckpointComplete) o;
+                       return this.timestamp == that.timestamp && 
super.equals(o);
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return String.format("ConfirmCheckpoint %d for (%s/%s)", 
+                               getCheckpointId(), getJob(), 
getTaskExecutionId());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1b2fb08..616998c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,7 +46,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -883,11 +883,11 @@ public class Task implements Runnable {
                                                        
checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp);
                                                }
                                                catch (Throwable t) {
-                                                       logger.error("Error 
while triggering checkpoint for " + taskName, t);
+                                                       failExternally(new 
RuntimeException("Error while triggering checkpoint for " + taskName, t));
                                                }
                                        }
                                };
-                               executeAsyncCallRunnable(runnable, "Checkpoint 
Trigger");
+                               executeAsyncCallRunnable(runnable, "Checkpoint 
Trigger for " + taskName);
                        }
                        else {
                                LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - "
@@ -899,15 +899,14 @@ public class Task implements Runnable {
                }
        }
        
-       public void confirmCheckpoint(final long checkpointID, 
-               final SerializedValue<StateHandle<?>> state) {
+       public void notifyCheckpointComplete(final long checkpointID) {
                AbstractInvokable invokable = this.invokable;
 
                if (executionState == ExecutionState.RUNNING && invokable != 
null) {
-                       if (invokable instanceof CheckpointCommittingOperator) {
+                       if (invokable instanceof 
CheckpointNotificationOperator) {
 
                                // build a local closure 
-                               final CheckpointCommittingOperator checkpointer 
= (CheckpointCommittingOperator) invokable;
+                               final CheckpointNotificationOperator 
checkpointer = (CheckpointNotificationOperator) invokable;
                                final Logger logger = LOG;
                                final String taskName = taskNameWithSubtask;
 
@@ -915,14 +914,15 @@ public class Task implements Runnable {
                                        @Override
                                        public void run() {
                                                try {
-                                                       
checkpointer.confirmCheckpoint(checkpointID, state);
+                                                       
checkpointer.notifyCheckpointComplete(checkpointID);
                                                }
                                                catch (Throwable t) {
-                                                       logger.error("Error 
while confirming checkpoint for " + taskName, t);
+                                                       // fail task if 
checkpoint confirmation failed.
+                                                       failExternally(new 
RuntimeException("Error while confirming checkpoint", t));
                                                }
                                        }
                                };
-                               executeAsyncCallRunnable(runnable, "Checkpoint 
Confirmation");
+                               executeAsyncCallRunnable(runnable, "Checkpoint 
Confirmation for " + taskName);
                        }
                        else {
                                LOG.error("Task received a checkpoint commit 
notification, but is not a checkpoint committing task - "

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8c816a1..520decd 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -36,7 +36,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{Configuration, ConfigConstants, 
GlobalConfiguration, IllegalConfigurationException}
-import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, 
TriggerCheckpoint, AbstractCheckpointMessage}
+import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, 
TriggerCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, 
ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
@@ -425,17 +425,16 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
           log.debug(s"Taskmanager received a checkpoint request for unknown 
task $taskExecutionId.")
         }
 
-      case message: ConfirmCheckpoint =>
+      case message: NotifyCheckpointComplete =>
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
-        val state = message.getState
 
         log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} 
for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
-          task.confirmCheckpoint(checkpointId, state)
+          task.notifyCheckpointComplete(checkpointId)
         } else {
           log.debug(
             s"Taskmanager received a checkpoint confirmation for unknown task 
$taskExecutionId.")

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d56704d..c02d301 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.junit.Test;
 
@@ -199,8 +199,8 @@ public class CheckpointCoordinatorTest {
                        
                        // validate that the relevant tasks got a confirmation 
message
                        {
-                               ConfirmCheckpoint confirmMessage1 = new 
ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp, null);
-                               ConfirmCheckpoint confirmMessage2 = new 
ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp, null);
+                               NotifyCheckpointComplete confirmMessage1 = new 
NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
+                               NotifyCheckpointComplete confirmMessage2 = new 
NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp);
                                verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
                                verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
                        }
@@ -237,8 +237,8 @@ public class CheckpointCoordinatorTest {
                                verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
                                verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 
-                               ConfirmCheckpoint confirmMessage1 = new 
ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew, null);
-                               ConfirmCheckpoint confirmMessage2 = new 
ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew, null);
+                               NotifyCheckpointComplete confirmMessage1 = new 
NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew);
+                               NotifyCheckpointComplete confirmMessage2 = new 
NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew);
                                verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
                                verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
                        }
@@ -343,7 +343,7 @@ public class CheckpointCoordinatorTest {
                        
                        // the first confirm message should be out
                        verify(commitVertex, 
times(1)).sendMessageToCurrentExecution(
-                                       new ConfirmCheckpoint(jid, 
commitAttemptID, checkpointId1, timestamp1, null), commitAttemptID);
+                                       new NotifyCheckpointComplete(jid, 
commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
                        
                        // send the last remaining ack for the second checkpoint
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
@@ -355,7 +355,7 @@ public class CheckpointCoordinatorTest {
 
                        // the second commit message should be out
                        verify(commitVertex, 
times(1)).sendMessageToCurrentExecution(
-                                       new ConfirmCheckpoint(jid, 
commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
+                                       new NotifyCheckpointComplete(jid, 
commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
                        
                        // validate the committed checkpoints
                        List<SuccessfulCheckpoint> scs = 
coord.getSuccessfulCheckpoints();
@@ -482,7 +482,7 @@ public class CheckpointCoordinatorTest {
 
                        // the first confirm message should be out
                        verify(commitVertex, 
times(1)).sendMessageToCurrentExecution(
-                                       new ConfirmCheckpoint(jid, 
commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
+                                       new NotifyCheckpointComplete(jid, 
commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
 
                        // send the last remaining ack for the first 
checkpoint. This should not do anything
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
@@ -551,7 +551,7 @@ public class CheckpointCoordinatorTest {
 
                        // no confirm message must have been sent
                        verify(commitVertex, times(0))
-                                       
.sendMessageToCurrentExecution(any(ConfirmCheckpoint.class), 
any(ExecutionAttemptID.class));
+                                       
.sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), 
any(ExecutionAttemptID.class));
                        
                        coord.shutdown();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 05255ac..211d5e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
@@ -38,7 +38,7 @@ public class CheckpointMessagesTest {
        @Test
        public void testTriggerAndConfirmCheckpoint() {
                try {
-                       ConfirmCheckpoint cc = new ConfirmCheckpoint(new 
JobID(), new ExecutionAttemptID(), 45287698767345L, 467L, null);
+                       NotifyCheckpointComplete cc = new 
NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 
45287698767345L, 467L);
                        testSerializabilityEqualsHashCode(cc);
                        
                        TriggerCheckpoint tc = new TriggerCheckpoint(new 
JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 618c01f..b18bf2a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -40,12 +40,10 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -134,7 +132,7 @@ public class TaskAsyncCallTest {
 
                        for (int i = 1; i <= NUM_CALLS; i++) {
                                task.triggerCheckpointBarrier(i, 156865867234L);
-                               task.confirmCheckpoint(i, null);
+                               task.notifyCheckpointComplete(i);
                        }
 
                        triggerLatch.await();
@@ -186,7 +184,7 @@ public class TaskAsyncCallTest {
        }
        
        public static class CheckpointsInOrderInvokable extends 
AbstractInvokable
-                       implements CheckpointedOperator, 
CheckpointCommittingOperator {
+                       implements CheckpointedOperator, 
CheckpointNotificationOperator {
 
                private volatile long lastCheckpointId = 0;
                
@@ -213,7 +211,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void triggerCheckpoint(long checkpointId, long 
timestamp) throws Exception {
+               public void triggerCheckpoint(long checkpointId, long 
timestamp) {
                        lastCheckpointId++;
                        if (checkpointId == lastCheckpointId) {
                                if (lastCheckpointId == NUM_CALLS) {
@@ -229,7 +227,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void confirmCheckpoint(long checkpointId, 
SerializedValue<StateHandle<?>> state) throws Exception {
+               public void notifyCheckpointComplete(long checkpointId) {
                        if (checkpointId != lastCheckpointId && this.error == 
null) {
                                this.error = new Exception("calls out of 
order");
                                synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
index 4286196..abe06b8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -27,16 +27,25 @@ import 
org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.io.IOException;
 
+/**
+ * Utilities for the Kafka connector
+ */
 public class Utils {
-       public static class TypeInformationSerializationSchema<T>
-                       implements DeserializationSchema<T>, 
SerializationSchema<T, byte[]> {
+
+       /**
+        * Utility serialization schema, created from Flink's TypeInformation 
system.
+        * @param <T>
+        */
+       public static class TypeInformationSerializationSchema<T> implements 
DeserializationSchema<T>, SerializationSchema<T, byte[]> {
                private final TypeSerializer<T> serializer;
                private final TypeInformation<T> ti;
+               private transient DataOutputSerializer dos;
 
-               public TypeInformationSerializationSchema(Object type, 
ExecutionConfig ec) {
-                       this.ti = (TypeInformation<T>) 
TypeExtractor.getForObject(type);
+               public TypeInformationSerializationSchema(T type, 
ExecutionConfig ec) {
+                       this.ti = TypeExtractor.getForObject(type);
                        this.serializer = ti.createSerializer(ec);
                }
+
                @Override
                public T deserialize(byte[] message) {
                        try {
@@ -53,13 +62,22 @@ public class Utils {
 
                @Override
                public byte[] serialize(T element) {
-                       DataOutputSerializer dos = new DataOutputSerializer(16);
+                       if(dos == null) {
+                               dos = new DataOutputSerializer(16);
+                       }
                        try {
                                serializer.serialize(element, dos);
                        } catch (IOException e) {
                                throw new RuntimeException("Unable to serialize 
record", e);
                        }
-                       return dos.getByteArray();
+                       byte[] ret = dos.getByteArray();
+                       if(ret.length != dos.length()) {
+                               byte[] n = new byte[dos.length()];
+                               System.arraycopy(ret, 0, n, 0, dos.length());
+                               ret = n;
+                       }
+                       dos.clear();
+                       return ret;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 6758f2c..b4a0b8b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.streaming.connectors.kafka.api.persistent;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,12 +40,12 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.zookeeper.data.Stat;
@@ -67,13 +66,14 @@ import com.google.common.base.Preconditions;
  */
 public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT> implements
                ResultTypeQueryable<OUT>,
-               CheckpointCommitter {
+               CheckpointNotifier, CheckpointedAsynchronously<long[]> {
 
        private static final long serialVersionUID = 287845877188312621L;
        
        private static final Logger LOG = 
LoggerFactory.getLogger(PersistentKafkaSource.class);
 
-       
+       private final LinkedMap pendingCheckpoints = new LinkedMap();
+
        private final String topicName;
        private final DeserializationSchema<OUT> deserializationSchema;
        
@@ -82,9 +82,11 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
        private transient ConsumerConnector consumer;
        
        private transient ZkClient zkClient;
-       private transient OperatorState<long[]> lastOffsets;
-       private transient long[] commitedOffsets; // maintain committed 
offsets, to avoid committing the same over and over again.
-       
+
+       private transient long[] lastOffsets;                   // Current 
offset (backuped state)
+       protected transient long[] commitedOffsets;     // maintain committed 
offsets, to avoid committing the same over and over again.
+       private transient long[] restoreState;                  // set by the 
restore() method, used by open() to valdiate the restored state.
+
        private volatile boolean running;
        
        /**
@@ -145,23 +147,26 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                // most likely the number of offsets we're going to store here 
will be lower than the number of partitions.
                int numPartitions = getNumberOfPartitions();
                LOG.debug("The topic {} has {} partitions", topicName, 
numPartitions);
-               this.lastOffsets = 
getRuntimeContext().getOperatorState("offset", new long[numPartitions], false);
+               this.lastOffsets = new long[numPartitions];
                this.commitedOffsets = new long[numPartitions];
+
                // check if there are offsets to restore
-               if (!Arrays.equals(lastOffsets.value(), new 
long[numPartitions])) {
-                       if (lastOffsets.value().length != numPartitions) {
-                               throw new IllegalStateException("There are 
"+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " 
+
+               if (restoreState != null) {
+                       if (restoreState.length != numPartitions) {
+                               throw new IllegalStateException("There are 
"+restoreState.length+" offsets to restore for topic "+topicName+" but " +
                                                "there are only 
"+numPartitions+" in the topic");
                        }
 
-                       LOG.info("Setting restored offsets {} in ZooKeeper", 
Arrays.toString(lastOffsets.value()));
-                       setOffsetsInZooKeeper(lastOffsets.value());
+                       LOG.info("Setting restored offsets {} in ZooKeeper", 
Arrays.toString(restoreState));
+                       setOffsetsInZooKeeper(restoreState);
+                       this.lastOffsets = restoreState;
                } else {
                        // initialize empty offsets
-                       Arrays.fill(this.lastOffsets.value(), -1);
+                       Arrays.fill(this.lastOffsets, -1);
                }
                Arrays.fill(this.commitedOffsets, 0); // just to make it clear
-               
+
+               pendingCheckpoints.clear();
                running = true;
        }
 
@@ -175,7 +180,7 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                
                while (running && iteratorToRead.hasNext()) {
                        MessageAndMetadata<byte[], byte[]> message = 
iteratorToRead.next();
-                       if(lastOffsets.value()[message.partition()] >= 
message.offset()) {
+                       if(lastOffsets[message.partition()] >= 
message.offset()) {
                                LOG.info("Skipping message with offset {} from 
partition {}", message.offset(), message.partition());
                                continue;
                        }
@@ -188,7 +193,7 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
 
                        // make the state update and the element emission atomic
                        synchronized (checkpointLock) {
-                               lastOffsets.value()[message.partition()] = 
message.offset();
+                               lastOffsets[message.partition()] = 
message.offset();
                                ctx.collect(next);
                        }
 
@@ -210,19 +215,63 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                zkClient.close();
        }
 
+       // -----------------  State Checkpointing -----------------
+
+       @Override
+       public long[] snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               if (lastOffsets == null) {
+                       LOG.warn("State snapshot requested on not yet opened 
source. Returning null");
+                       return null;
+               }
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Snapshotting state. Offsets: {}, checkpoint 
id {}, timestamp {}",
+                                       Arrays.toString(lastOffsets), 
checkpointId, checkpointTimestamp);
+               }
+
+               long[] currentOffsets = Arrays.copyOf(lastOffsets, 
lastOffsets.length);
+
+               // the map may be asynchronously updates when committing to 
Kafka, so we synchronize
+               synchronized (pendingCheckpoints) {
+                       pendingCheckpoints.put(checkpointId, currentOffsets);
+               }
+
+               return currentOffsets;
+       }
+
+       @Override
+       public void restoreState(long[] state) {
+               LOG.info("The state will be restored to {} in the open() 
method", Arrays.toString(state));
+               this.restoreState = Arrays.copyOf(state, state.length);
+       }
+
        
        /**
         * Notification on completed checkpoints
         * @param checkpointId The ID of the checkpoint that has been completed.
         * @throws Exception 
         */
-       @Override
-       public void commitCheckpoint(long checkpointId, String stateName, 
StateHandle<Serializable> state) throws Exception {
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                LOG.info("Commit checkpoint {}", checkpointId);
 
                long[] checkpointOffsets;
 
-               checkpointOffsets = (long[]) state.getState();
+               // the map may be asynchronously updates when snapshotting 
state, so we synchronize
+               synchronized (pendingCheckpoints) {
+                       final int posInMap = 
pendingCheckpoints.indexOf(checkpointId);
+                       if (posInMap == -1) {
+                               LOG.warn("Unable to find pending checkpoint for 
id {}", checkpointId);
+                               return;
+                       }
+
+                       checkpointOffsets = (long[]) 
pendingCheckpoints.remove(posInMap);
+                       // remove older checkpoints in map:
+                       if (!pendingCheckpoints.isEmpty()) {
+                               for(int i = 0; i < posInMap; i++) {
+                                       pendingCheckpoints.remove(0);
+                               }
+                       }
+               }
 
                if (LOG.isInfoEnabled()) {
                        LOG.info("Committing offsets {} to ZooKeeper", 
Arrays.toString(checkpointOffsets));
@@ -254,11 +303,15 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
        }
 
        protected void setOffset(int partition, long offset) {
-               if(commitedOffsets[partition] < offset) {
-                       setOffset(zkClient, consumerConfig.groupId(), 
topicName, partition, offset);
-                       commitedOffsets[partition] = offset;
-               } else {
-                       LOG.debug("Ignoring offset {} for partition {} because 
it is already committed", offset, partition);
+               // synchronize because notifyCheckpointComplete is called using 
asynchronous worker threads (= multiple checkpoints might be confirmed 
concurrently)
+               synchronized (commitedOffsets) {
+                       if(commitedOffsets[partition] < offset) {
+                               LOG.info("Committed offsets {}, partition={}, 
offset={}, locking on {}", Arrays.toString(commitedOffsets), partition, offset, 
commitedOffsets.hashCode());
+                               setOffset(zkClient, consumerConfig.groupId(), 
topicName, partition, offset);
+                               commitedOffsets[partition] = offset;
+                       } else {
+                               LOG.debug("Ignoring offset {} for partition {} 
because it is already committed", offset, partition);
+                       }
                }
        }
 
@@ -305,7 +358,6 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                return deserializationSchema.getProducedType();
        }
 
-
        // ---------------------- Zookeeper Serializer copied from Kafka 
(because it has private access there)  -----------------
 
        public static class KafkaZKStringSerializer implements ZkSerializer {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 4b763b2..e039592 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -46,6 +47,7 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -182,6 +184,63 @@ public class KafkaITCase {
                zkClient.close();
        }
 
+       // --------------------------  test checkpointing 
------------------------
+       @Test
+       public void testCheckpointing() throws Exception {
+               createTestTopic("testCheckpointing", 1, 1);
+
+               Properties props = new Properties();
+               props.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               props.setProperty("group.id", "testCheckpointing");
+               props.setProperty("auto.commit.enable", "false");
+               ConsumerConfig cc = new ConsumerConfig(props);
+               PersistentKafkaSource<String> source = new 
PersistentKafkaSource<String>("testCheckpointing", new 
FakeDeserializationSchema(), cc);
+
+
+               Field pendingCheckpointsField = 
PersistentKafkaSource.class.getDeclaredField("pendingCheckpoints");
+               pendingCheckpointsField.setAccessible(true);
+               LinkedMap pendingCheckpoints = (LinkedMap) 
pendingCheckpointsField.get(source);
+
+
+               Assert.assertEquals(0, pendingCheckpoints.size());
+               // first restore
+               source.restoreState(new long[]{1337});
+               // then open
+               source.open(new Configuration());
+               long[] state1 = source.snapshotState(1, 15);
+               Assert.assertArrayEquals(new long[]{1337}, state1);
+               long[] state2 = source.snapshotState(2, 30);
+               Assert.assertArrayEquals(new long[]{1337}, state2);
+               Assert.assertEquals(2, pendingCheckpoints.size());
+
+               source.notifyCheckpointComplete(1);
+               Assert.assertEquals(1, pendingCheckpoints.size());
+
+               source.notifyCheckpointComplete(2);
+               Assert.assertEquals(0, pendingCheckpoints.size());
+
+               source.notifyCheckpointComplete(666); // invalid checkpoint
+               Assert.assertEquals(0, pendingCheckpoints.size());
+
+               // create 500 snapshots
+               for(int i = 0; i < 500; i++) {
+                       source.snapshotState(i, 15 * i);
+               }
+               Assert.assertEquals(500, pendingCheckpoints.size());
+
+               // commit only the second last
+               source.notifyCheckpointComplete(498);
+               Assert.assertEquals(1, pendingCheckpoints.size());
+
+               // access invalid checkpoint
+               source.notifyCheckpointComplete(490);
+
+               // and the last
+               source.notifyCheckpointComplete(499);
+               Assert.assertEquals(0, pendingCheckpoints.size());
+       }
+
+
        private static class FakeDeserializationSchema implements 
DeserializationSchema<String> {
 
                @Override
@@ -220,6 +279,34 @@ public class KafkaITCase {
 
                zk.close();
        }
+
+       public static class TestPersistentKafkaSource<OUT> extends 
PersistentKafkaSource<OUT> {
+               private static Object sync = new Object();
+               public static long[] finalOffset;
+               public TestPersistentKafkaSource(String topicName, 
DeserializationSchema<OUT> deserializationSchema, ConsumerConfig 
consumerConfig) {
+                       super(topicName, deserializationSchema, consumerConfig);
+               }
+
+               @Override
+               public void close() {
+                       super.close();
+                       LOG.info("Starting close " 
+Arrays.toString(commitedOffsets));
+                       synchronized (sync) {
+                               if (finalOffset == null) {
+                                       finalOffset = new 
long[commitedOffsets.length];
+                               }
+                               for(int i = 0; i < commitedOffsets.length; i++) 
{
+                                       if(commitedOffsets[i] > 0) {
+                                               if(finalOffset[i] > 0) {
+                                                       throw new 
RuntimeException("This is unexpected on i = "+i);
+                                               }
+                                               finalOffset[i] = 
commitedOffsets[i];
+                                       }
+                               }
+                       }
+                       LOG.info("Finished closing. Final 
"+Arrays.toString(finalOffset));
+               }
+       }
        /**
         * We want to use the High level java consumer API but manage the 
offset in Zookeeper manually.
         *
@@ -246,27 +333,35 @@ public class KafkaITCase {
                // write a sequence from 0 to 99 to each of the three 
partitions.
                writeSequence(env, topicName, 0, 99);
 
-
                readSequence(env, standardCC, topicName, 0, 100, 300);
 
+               LOG.info("State in persistent kafka sources {}", 
TestPersistentKafkaSource.finalOffset);
+
                // check offsets to be set at least higher than 50.
                // correctly, we would expect them to be set to 99, but right 
now there is no way of stopping a topology once all pending
                // checkpoints have been committed.
                // To work around that limitation, the persistent kafka 
consumer is throtteled with a thread.sleep().
-               long o1 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 0);
-               long o2 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 1);
-               long o3 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 2);
-               Assert.assertTrue("The offset seems incorrect, got " + o1, o1 > 
50L);
-               Assert.assertTrue("The offset seems incorrect, got " + o2, o2 > 
50L);
-               Assert.assertTrue("The offset seems incorrect, got " + o3, o3 > 
50L);
-               /** Once we have proper shutdown of streaming jobs, enable 
these tests
-                Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
-                Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
-                Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/
 
+               long o1 = -1, o2 = -1, o3 = -1;
+               if(TestPersistentKafkaSource.finalOffset[0] > 0) {
+                       o1 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 0);
+                       Assert.assertTrue("The offset seems incorrect, got " + 
o1, o1 == TestPersistentKafkaSource.finalOffset[0]);
+               }
+               if(TestPersistentKafkaSource.finalOffset[1] > 0) {
+                       o2 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 1);
+                       Assert.assertTrue("The offset seems incorrect, got " + 
o2, o2 == TestPersistentKafkaSource.finalOffset[1]);
+               }
+               if(TestPersistentKafkaSource.finalOffset[2] > 0) {
+                       o3 = PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 2);
+                       Assert.assertTrue("The offset seems incorrect, got " + 
o3, o3 == TestPersistentKafkaSource.finalOffset[2]);
+               }
+               Assert.assertFalse("no offset has been set", 
TestPersistentKafkaSource.finalOffset[0] == 0 &&
+                                                                               
                        TestPersistentKafkaSource.finalOffset[1] == 0 &&
+                                                                               
                        TestPersistentKafkaSource.finalOffset[2] == 0);
+               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 
                LOG.info("Manipulating offsets");
-               // set the offset to 25, 50, and 75 for the three partitions
+               // set the offset to 50 for the three partitions
                PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 0, 50);
                PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 1, 50);
                PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 2, 50);
@@ -283,20 +378,16 @@ public class KafkaITCase {
 
        private void readSequence(StreamExecutionEnvironment env, 
ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int 
valuesCount, final int finalCount) throws Exception {
                LOG.info("Reading sequence for verification until final count 
{}", finalCount);
-               DataStream<Tuple2<Integer, Integer>> source = env.addSource(
-                               new PersistentKafkaSource<Tuple2<Integer, 
Integer>>(topicName, new 
Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new 
Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
-               )
-                               //add a sleeper mapper. Since there is no good 
way of "shutting down" a running topology, we have
-                               // to play this trick. The problem is that we 
have to wait until all checkpoints are confirmed
-                               .map(new MapFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>() {
-                                       private static final long 
serialVersionUID = 1L;
-
-                                       @Override
-                                       public Tuple2<Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws Exception {
-                                               Thread.sleep(150);
-                                               return value;
-                                       }
-                               }).setParallelism(3);
+               TestPersistentKafkaSource<Tuple2<Integer, Integer>> pks = new 
TestPersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new 
Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new 
Tuple2<Integer, Integer>(1, 1), env.getConfig()), cc);
+               DataStream<Tuple2<Integer, Integer>> source = 
env.addSource(pks).map(new MapFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>() {
+                       // we need to slow down the source so that it can 
participate in a few checkpoints.
+                       // Otherwise it would write its data into buffers and 
shut down.
+                       @Override
+                       public Tuple2<Integer, Integer> map(Tuple2<Integer, 
Integer> value) throws Exception {
+                               Thread.sleep(50);
+                               return value;
+                       }
+               });
 
                // verify data
                DataStream<Integer> validIndexes = source.flatMap(new 
RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
@@ -312,7 +403,6 @@ public class KafkaITCase {
 
                                LOG.info("Reader " + 
getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + 
count + "/" + finalCount);
                                // verify if we've seen everything
-                               
                                if (count == finalCount) {
                                        LOG.info("Received all values");
                                        for (int i = 0; i < values.length; i++) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
new file mode 100644
index 0000000..32bd1d1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UtilsTest {
+
+       /**
+        * Ensure that the returned byte array has the expected size
+        */
+       @Test
+       public void testTypeInformationSerializationSchema() {
+               final ExecutionConfig ec = new ExecutionConfig();
+
+               Tuple2<Integer, Integer> test = new Tuple2<Integer, 
Integer>(1,666);
+
+               Utils.TypeInformationSerializationSchema<Tuple2<Integer, 
Integer>> ser = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, 
Integer>>(test, ec);
+
+               byte[] res = ser.serialize(test);
+               Assert.assertEquals(8, res.length);
+
+               Tuple2<Integer, Integer> another = ser.deserialize(res);
+               Assert.assertEquals(test.f0, another.f0);
+               Assert.assertEquals(test.f1, another.f1);
+       }
+
+       @Test
+       public void testGrowing() {
+               final ExecutionConfig ec = new ExecutionConfig();
+
+               Tuple2<Integer, byte[]> test1 = new Tuple2<Integer, byte[]>(1, 
new byte[16]);
+
+               Utils.TypeInformationSerializationSchema<Tuple2<Integer, 
byte[]>> ser = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, 
byte[]>>(test1, ec);
+
+               byte[] res = ser.serialize(test1);
+               Assert.assertEquals(24, res.length);
+               Tuple2<Integer, byte[]> another = ser.deserialize(res);
+               Assert.assertEquals(16, another.f1.length);
+
+               test1 = new Tuple2<Integer, byte[]>(1, new byte[26]);
+
+               res = ser.serialize(test1);
+               Assert.assertEquals(34, res.length);
+               another = ser.deserialize(res);
+               Assert.assertEquals(26, another.f1.length);
+
+               test1 = new Tuple2<Integer, byte[]>(1, new byte[1]);
+
+               res = ser.serialize(test1);
+               Assert.assertEquals(9, res.length);
+               another = ser.deserialize(res);
+               Assert.assertEquals(1, another.f1.length);
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
deleted file mode 100644
index 306df71..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * This interface must be implemented by functions/operations that want to 
receive
- * a commit notification once a checkpoint has been completely acknowledged by 
all
- * participants.
- */
-public interface CheckpointCommitter {
-
-       /**
-        * This method is called as a notification once a distributed 
checkpoint has been completed.
-        * 
-        * Note that any exception during this method will not cause the 
checkpoint to
-        * fail any more.
-        * 
-        * @param checkpointId The ID of the checkpoint that has been completed.
-        * @param stateName The name of the committed state
-        * @param checkPointedState Handle to the state that was checkpointed 
with this checkpoint id.
-        * @throws Exception 
-        */
-       void commitCheckpoint(long checkpointId, String stateName, 
StateHandle<Serializable> checkPointedState) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
new file mode 100644
index 0000000..c2d2182
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.checkpoint;
+
+/**
+ * This interface must be implemented by functions/operations that want to 
receive
+ * a commit notification once a checkpoint has been completely acknowledged by 
all
+ * participants.
+ */
+public interface CheckpointNotifier {
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been completed.
+        * 
+        * Note that any exception during this method will not cause the 
checkpoint to
+        * fail any more.
+        * 
+        * @param checkpointId The ID of the checkpoint that has been completed.
+        * @throws Exception
+        */
+       void notifyCheckpointComplete(long checkpointId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 4d541bc..c988150 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -394,15 +394,16 @@ public class StreamingJobGraphGenerator {
                        List<JobVertexID> ackVertices = new 
ArrayList<JobVertexID>(jobVertices.size());
 
                        // collect the vertices that receive "commit 
checkpoint" messages
-                       // currently, these are only the sources
+                       // currently, these are all certices
                        List<JobVertexID> commitVertices = new 
ArrayList<JobVertexID>();
                        
                        
                        for (JobVertex vertex : jobVertices.values()) {
                                if (vertex.isInputVertex()) {
                                        triggerVertices.add(vertex.getID());
-                                       commitVertices.add(vertex.getID());
                                }
+                               // TODO: add check whether the user function 
implements the checkpointing interface
+                               commitVertices.add(vertex.getID());
                                ackVertices.add(vertex.getID());
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index c128a7b..b2d9c91 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.PartitionedStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.state.StreamOperatorState;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -134,11 +134,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function & Serial
 
        }
 
-       public void confirmCheckpointCompleted(long checkpointId, String 
stateName,
-                       StateHandle<Serializable> checkpointedState) throws 
Exception {
-               if (userFunction instanceof CheckpointCommitter) {
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               if (userFunction instanceof CheckpointNotifier) {
                        try {
-                               ((CheckpointCommitter) 
userFunction).commitCheckpoint(checkpointId, stateName, checkpointedState);
+                               ((CheckpointNotifier) 
userFunction).notifyCheckpointComplete(checkpointId);
                        } catch (Exception e) {
                                throw new Exception("Error while confirming 
checkpoint " + checkpointId + " to the stream function", e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
index 6b5a3e8..afc36e0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
@@ -36,5 +36,5 @@ public interface StatefulStreamOperator<OUT> extends 
StreamOperator<OUT> {
 
        Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> 
getStateSnapshotFromFunction(long checkpointId, long timestamp) throws 
Exception;
 
-       void confirmCheckpointCompleted(long checkpointId, String stateName, 
StateHandle<Serializable> checkpointedState) throws Exception;
+       void notifyCheckpointComplete(long checkpointId) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7421a33..f98ed2d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
@@ -33,7 +32,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.state.FileStateHandle;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.PartitionedStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
@@ -52,7 +50,7 @@ import org.slf4j.LoggerFactory;
 
 
 public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends 
AbstractInvokable implements
-               OperatorStateCarrier<StateHandle<Serializable>>, 
CheckpointedOperator, CheckpointCommittingOperator {
+               OperatorStateCarrier<StateHandle<Serializable>>, 
CheckpointedOperator, CheckpointNotificationOperator {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamTask.class);
 
@@ -69,11 +67,11 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
        protected volatile boolean isRunning = false;
 
        protected List<StreamingRuntimeContext> contexts;
-       
+
        protected StreamingRuntimeContext headContext;
 
        protected ClassLoader userClassLoader;
-       
+
        private EventListener<TaskEvent> superstepListener;
 
        public StreamTask() {
@@ -86,11 +84,11 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
        public void registerInputOutput() {
                this.userClassLoader = getUserCodeClassLoader();
                this.configuration = new StreamConfig(getTaskConfiguration());
-               
+
                streamOperator = 
configuration.getStreamOperator(userClassLoader);
 
                outputHandler = new OutputHandler<OUT>(this);
-               
+
                if (streamOperator != null) {
                        // IterationHead and IterationTail don't have an 
Operator...
 
@@ -110,13 +108,13 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
        public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) {
                Environment env = getEnvironment();
                String operatorName = 
conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
-               
+
                KeySelector<?,Serializable> statePartitioner = 
conf.getStatePartitioner(userClassLoader);
-               
+
                return new StreamingRuntimeContext(operatorName, env, 
getUserCodeClassLoader(),
                                getExecutionConfig(), statePartitioner, 
getStateHandleProvider());
        }
-       
+
        private StateHandleProvider<Serializable> getStateHandleProvider() {
 
                StateHandleProvider<Serializable> provider = configuration
@@ -134,7 +132,7 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
                        } catch (Exception e) {
                                throw new RuntimeException(backendName + " is 
not a valid state backend.\nSupported backends: jobmanager, filesystem.");
                        }
-                       
+
                        switch (backend) {
                                case JOBMANAGER:
                                        LOG.info("State backend for state 
checkpoints is set to jobmanager.");
@@ -152,7 +150,7 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
                                default:
                                        throw new RuntimeException("Backend " + 
backend + " is not supported yet.");
                        }
-                       
+
                } else {
                        LOG.info("Using user defined state backend for 
streaming checkpoitns.");
                        return provider;
@@ -199,7 +197,7 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
        @SuppressWarnings("unchecked")
        @Override
        public void setInitialState(StateHandle<Serializable> stateHandle) 
throws Exception {
-               
+
                // We retrieve end restore the states for the chained oeprators.
                List<Tuple2<StateHandle<Serializable>, Map<String, 
PartitionedStateHandle>>> chainedStates = 
(List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) 
stateHandle.getState();
 
@@ -217,15 +215,15 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
 
        @Override
        public void triggerCheckpoint(long checkpointId, long timestamp) throws 
Exception {
-               
+
                synchronized (checkpointLock) {
                        if (isRunning) {
                                try {
                                        LOG.debug("Starting checkpoint {} on 
task {}", checkpointId, getName());
-                                       
+
                                        // We wrap the states of the chained 
operators in a list, marking non-stateful oeprators with null
                                        List<Tuple2<StateHandle<Serializable>, 
Map<String, PartitionedStateHandle>>> chainedStates = new 
ArrayList<Tuple2<StateHandle<Serializable>, Map<String, 
PartitionedStateHandle>>>();
-                                       
+
                                        // A wrapper handle is created for the 
List of statehandles
                                        WrapperStateHandle stateHandle;
                                        try {
@@ -240,17 +238,17 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
                                                                
chainedStates.add(null);
                                                        }
                                                }
-                                               
+
                                                stateHandle = 
CollectionUtils.exists(chainedStates,
                                                                
NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
                                        }
                                        catch (Exception e) {
                                                throw new Exception("Error 
while drawing snapshot of the user state.", e);
                                        }
-                       
+
                                        // now emit the checkpoint barriers
                                        
outputHandler.broadcastBarrier(checkpointId, timestamp);
-                                       
+
                                        // now confirm the checkpoint
                                        if (stateHandle == null) {
                                                
getEnvironment().acknowledgeCheckpoint(checkpointId);
@@ -270,44 +268,24 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
 
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
-       public void confirmCheckpoint(long checkpointId, 
SerializedValue<StateHandle<?>> stateHandle) throws Exception {
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               // we do nothing here so far. this should call commit on the 
source function, for example
                synchronized (checkpointLock) {
-                       if (stateHandle != null) {
-                               List<Tuple2<StateHandle<Serializable>, 
Map<String, PartitionedStateHandle>>> chainedStates = 
(List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) 
stateHandle
-                                               
.deserializeValue(getUserCodeClassLoader()).getState();
-
-                               for (int i = 0; i < chainedStates.size(); i++) {
-                                       Tuple2<StateHandle<Serializable>, 
Map<String, PartitionedStateHandle>> chainedState = chainedStates
-                                                       .get(i);
-                                       StreamOperator<?> chainedOperator = 
outputHandler.getChainedOperators().get(i);
-
-                                       if (chainedState != null) {
-                                               if (chainedState.f0 != null) {
-                                                       
((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(
-                                                                       
checkpointId, null, chainedState.f0);
-                                               }
+                       if (streamOperator instanceof StatefulStreamOperator) {
+                               ((StatefulStreamOperator) 
streamOperator).notifyCheckpointComplete(checkpointId);
+                       }
 
-                                               if (chainedState.f1 != null) {
-                                                       if (chainedOperator 
instanceof StatefulStreamOperator) {
-                                                               for 
(Entry<String, PartitionedStateHandle> stateEntry : chainedState.f1
-                                                                               
.entrySet()) {
-                                                                       for 
(StateHandle<Serializable> handle : stateEntry.getValue()
-                                                                               
        .getState().values()) {
-                                                                               
((StatefulStreamOperator) chainedOperator)
-                                                                               
                .confirmCheckpointCompleted(checkpointId,
-                                                                               
                                stateEntry.getKey(), handle);
-                                                                       }
-                                                               }
-                                                       }
-                                               }
+                       if (hasChainedOperators) {
+                               for (StreamOperator<?> chainedOperator : 
outputHandler.getChainedOperators()) {
+                                       if (chainedOperator instanceof 
StatefulStreamOperator) {
+                                               ((StatefulStreamOperator) 
chainedOperator).notifyCheckpointComplete(checkpointId);
                                        }
                                }
                        }
-
                }
        }
-       
-       
+
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
index 0b686e5..fcee1b5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
@@ -17,7 +17,7 @@
 
################################################################################
 
 # Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
+log4j.rootLogger=INFO, A1
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index a826eff..1730c63 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -23,26 +23,35 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
 import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -87,9 +96,8 @@ public class StreamCheckpointingITCase {
                        fail("Failed to stop test cluster: " + e.getMessage());
                }
        }
-       
-       
-       
+
+
        /**
         * Runs the following program:
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 43285b5..23b8940 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -467,7 +467,11 @@ public abstract class YarnTestBase {
                        // check if thread died
                        if(!runner.isAlive()) {
                                sendOutput();
-                               Assert.fail("Runner thread died before the test 
was finished. Return value = " +runner.getReturnValue());
+                               if(runner.getReturnValue() != 0) {
+                                       Assert.fail("Runner thread died before 
the test was finished. Return value = " + runner.getReturnValue());
+                               } else {
+                                       LOG.info("Runner stopped earlier than 
expected with return value = 0");
+                               }
                        }
                }
                sendOutput();

Reply via email to