http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
deleted file mode 100644
index bf3b3f4..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.common.NotLeaderForPartitionException;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.MessageAndOffset;
-
-/**
- * Iterates the records received from a partition of a Kafka topic as byte 
arrays.
- *
- * This code is in parts based on https://gist.github.com/ashrithr/5811266.
- */
-public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, 
Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
-
-       private List<String> hosts;
-       private String topic;
-       private int partition;
-       private long readOffset;
-       private transient SimpleConsumer consumer;
-       private List<String> replicaBrokers;
-       private String clientName;
-       private Broker leadBroker;
-       private final ConsumerConfig consumerConfig;
-
-       private KafkaOffset initialOffset;
-       private transient Iterator<MessageAndOffset> iter;
-       private transient FetchResponse fetchResponse;
-
-       /**
-        * Constructor with configurable wait time on empty fetch. For 
connecting to the Kafka service
-        * we use the so called simple or low level Kafka API thus directly 
connecting to one of the brokers.
-        *
-        * @param topic
-        *              Name of the topic to listen to
-        * @param partition
-        *              Partition in the chosen topic
-        * @param initialOffset
-        *              Offset to start consuming at
-        * @param kafkaTopicUtils
-        *              Util for receiving topic metadata
-        */
-       public KafkaSinglePartitionIterator(String topic, int partition, 
KafkaOffset initialOffset,
-                       KafkaTopicUtils kafkaTopicUtils, ConsumerConfig 
consumerConfig) {
-
-               Set<String> brokerAddresses = 
kafkaTopicUtils.getBrokerAddresses(topic, partition);
-               this.hosts = new ArrayList<String>(brokerAddresses);
-
-               this.consumerConfig = consumerConfig;
-               this.topic = topic;
-               this.partition = partition;
-
-               this.initialOffset = initialOffset;
-
-               this.replicaBrokers = new ArrayList<String>();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Initializing a connection
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Initializes the connection by detecting the leading broker of
-        * the topic and establishing a connection to it.
-        */
-       public void initialize() {
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("Initializing consumer {} / {} with hosts {}", 
topic, partition, hosts);
-               }
-
-               PartitionMetadata metadata = getPartitionMetadata();
-
-               leadBroker = metadata.leader();
-               clientName = "Client_" + topic + "_" + partition;
-
-               consumer = new SimpleConsumer(leadBroker.host(), 
leadBroker.port(), consumerConfig.socketTimeoutMs(), 
consumerConfig.socketReceiveBufferBytes(), clientName);
-
-               try {
-                       readOffset = initialOffset.getOffset(consumer, topic, 
partition, clientName);
-               } catch (NotLeaderForPartitionException e) {
-                       throw new RuntimeException("Unable to get offset",e);
-               }
-
-               try {
-                       resetFetchResponse(readOffset);
-               } catch (ClosedChannelException e) {
-                       if (LOG.isWarnEnabled()) {
-                               LOG.warn("Got ClosedChannelException, trying to 
find new leader.");
-                       }
-                       findNewLeader();
-               }
-       }
-
-       private PartitionMetadata getPartitionMetadata() {
-               PartitionMetadata metadata;
-               int retry = 0;
-               int waitTime = 
consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY,
 PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT);
-               do {
-                       metadata = findLeader(hosts, topic, partition);
-                       /*try {
-                               Thread.sleep(10000);
-                       } catch (InterruptedException e) {
-                               throw new RuntimeException("Establishing 
connection to Kafka failed", e);
-                       } */
-                       if(metadata == null) {
-                               retry++;
-                               if(retry == 
consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY,
 PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) {
-                                       throw new RuntimeException("Tried 
finding a leader "+retry+" times without success");
-                               }
-                               LOG.warn("Unable to get leader and partition 
metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1);
-                               try {
-                                       Thread.sleep(waitTime);
-                               } catch (InterruptedException e) {
-                                       throw new 
RuntimeException("Establishing connection to Kafka failed", e);
-                               }
-                       }
-               } while (metadata == null);
-
-               if (metadata.leader() == null) {
-                       throw new RuntimeException("Can't find Leader for Topic 
and Partition. (at " + hosts + ")");
-               }
-
-               return metadata;
-       }
-
-       /**
-        * Sets the partition to read from.
-        *
-        * @param partition
-        *              partition number
-        */
-       public void setPartition(int partition) {
-               this.partition = partition;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Iterator methods
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Convenience method to emulate iterator behaviour.
-        *
-        * @return whether the iterator has a next element
-        */
-       public boolean hasNext() {
-               return true;
-       }
-
-       /**
-        * Returns the next message received from Kafka as a
-        * byte array.
-        *
-        * @return next message as a byte array.
-        */
-       public byte[] next() {
-               return nextWithOffset().getMessage();
-       }
-
-       public boolean fetchHasNext() {
-               synchronized (fetchResponse) {
-                       if (!iter.hasNext()) {
-                               try {
-                                       resetFetchResponse(readOffset);
-                               } catch (ClosedChannelException e) {
-                                       if (LOG.isWarnEnabled()) {
-                                               LOG.warn("Got 
ClosedChannelException, trying to find new leader.", e);
-                                       }
-                                       findNewLeader();
-                               }
-                               return iter.hasNext();
-                       } else {
-                               return true;
-                       }
-               }
-       }
-
-       /**
-        * Returns the next message and its offset received from
-        * Kafka encapsulated in a POJO.
-        *
-        * @return next message and its offset.
-        */
-       public MessageWithMetadata nextWithOffset() {
-
-               synchronized (fetchResponse) {
-                       if (!iter.hasNext()) {
-                               throw new RuntimeException(
-                                               "Trying to read when response 
is not fetched. Call fetchHasNext() first.");
-                       }
-
-                       MessageAndOffset messageAndOffset = iter.next();
-                       long currentOffset = messageAndOffset.offset();
-
-                       while (currentOffset < readOffset) {
-                               messageAndOffset = iter.next();
-                               currentOffset = messageAndOffset.offset();
-                       }
-
-                       readOffset = messageAndOffset.nextOffset();
-                       ByteBuffer payload = 
messageAndOffset.message().payload();
-
-                       byte[] bytes = new byte[payload.limit()];
-                       payload.get(bytes);
-
-                       return new 
MessageWithMetadata(messageAndOffset.offset(), bytes, partition);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Internal utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       private void resetFetchResponse(long offset) throws 
ClosedChannelException {
-               FetchRequest req = new 
FetchRequestBuilder().clientId(clientName)
-                               .addFetch(topic, partition, offset, 
consumerConfig.fetchMessageMaxBytes()).build();
-
-               fetchResponse = consumer.fetch(req);
-
-               if (fetchResponse.hasError()) {
-                       short code = fetchResponse.errorCode(topic, partition);
-
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Error fetching data from the 
Broker:" + leadBroker + " Reason: " + code);
-                       }
-
-                       if (code == ErrorMapping.OffsetOutOfRangeCode()) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Asked for invalid offset 
{}", offset);
-                               }
-                               String reset = consumerConfig.autoOffsetReset();
-                               if(reset.equals("smallest")) {
-                                       LOG.info("Setting read offset to 
beginning (smallest)");
-                                       readOffset = new 
BeginningOffset().getOffset(consumer, topic, partition, clientName);
-                               } else if(reset.equals("largest")) {
-                                       LOG.info("Setting read offset to 
current offset (largest)");
-                                       readOffset = new 
CurrentOffset().getOffset(consumer, topic, partition, clientName);
-                               } else {
-                                       throw new RuntimeException("Unknown 
'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 
'largest'.");
-                               }
-                       }
-
-                       findNewLeader();
-               }
-
-               iter = fetchResponse.messageSet(topic, partition).iterator();
-       }
-
-       private void findNewLeader() {
-               consumer.close();
-               consumer = null;
-               leadBroker = findNewLeader(leadBroker, topic, partition);
-               consumer = new SimpleConsumer(leadBroker.host(), 
leadBroker.port(), consumerConfig.socketTimeoutMs(), 
consumerConfig.socketReceiveBufferBytes(), clientName);
-       }
-
-       private PartitionMetadata findLeader(List<String> addresses, String 
topic, int partition) {
-
-               PartitionMetadata returnMetaData = null;
-               loop:
-               for (String address : addresses) {
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Trying to find leader via broker: 
{}", address);
-                       }
-
-                       String[] split = address.split(":");
-                       String host = split[0];
-                       int port = Integer.parseInt(split[1]);
-
-                       SimpleConsumer consumer = null;
-                       try {
-                               consumer = new SimpleConsumer(host, port, 
consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), 
"leaderLookup");
-                               List<String> topics = 
Collections.singletonList(topic);
-
-                               TopicMetadataRequest req = new 
TopicMetadataRequest(topics);
-
-                               kafka.javaapi.TopicMetadataResponse resp = 
consumer.send(req);
-
-                               List<TopicMetadata> metaData = 
resp.topicsMetadata();
-                               for (TopicMetadata item : metaData) {
-                                       for (PartitionMetadata part : 
item.partitionsMetadata()) {
-                                               if (part.partitionId() == 
partition) {
-                                                       returnMetaData = part;
-                                                       break loop;
-                                               }
-                                       }
-                               }
-                       } catch (Exception e) {
-                               if (e instanceof ClosedChannelException) {
-                                       LOG.warn("Got ClosedChannelException 
while trying to communicate with Broker" +
-                                                       "[{}] to find Leader 
for [{}, {}]. Trying other replicas.", address, topic, partition);
-                               } else {
-                                       throw new RuntimeException("Error 
communicating with Broker [" + address + "] to find Leader for [" + topic + ", 
" + partition + "]", e);
-                               }
-                       } finally {
-                               if (consumer != null) {
-                                       consumer.close();
-                               }
-                       }
-               }
-               if (returnMetaData != null) {
-                       replicaBrokers.clear();
-                       for (kafka.cluster.Broker replica : 
returnMetaData.replicas()) {
-                               replicaBrokers.add(replica.host() + ":" + 
replica.port());
-                       }
-               }
-               return returnMetaData;
-       }
-
-       private Broker findNewLeader(Broker oldLeader, String topic, int 
a_partition) {
-               for (int i = 0; i < 3; i++) {
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info("Trying to find a new leader after 
Broker failure.");
-                       }
-                       boolean goToSleep = false;
-                       PartitionMetadata metadata = findLeader(replicaBrokers, 
topic, a_partition);
-                       if (metadata == null) {
-                               goToSleep = true;
-                       } else if (metadata.leader() == null) {
-                               goToSleep = true;
-                       } else if 
(oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
-                               // first time through if the leader hasn't 
changed give ZooKeeper a second to recover
-                               // second time, assume the broker did recover 
before failover, or it was a non-Broker issue
-                               //
-                               goToSleep = true;
-                       } else {
-                               return metadata.leader();
-                       }
-                       if (goToSleep) {
-                               try {
-                                       Thread.sleep(10000);
-                               } catch (InterruptedException ie) {
-                               }
-                       }
-               }
-               throw new RuntimeException("Unable to find new leader after 
Broker failure.");
-       }
-
-       public int getId() {
-               return this.partition;
-       }
-
-       @Override
-       public String toString() {
-               return "SinglePartitionIterator{partition="+partition+" 
readOffset="+readOffset+"}";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
deleted file mode 100644
index 15e7b36..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class BeginningOffset extends KafkaOffset {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public long getOffset(SimpleConsumer consumer, String topic, int 
partition, String clientName) {
-               return getLastOffset(consumer, topic, partition, 
OffsetRequest.EarliestTime(), clientName);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
deleted file mode 100644
index 6119f32..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class CurrentOffset extends KafkaOffset {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public long getOffset(SimpleConsumer consumer, String topic, int 
partition, String clientName) {
-               return getLastOffset(consumer, topic, partition, 
OffsetRequest.LatestTime(), clientName);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
deleted file mode 100644
index 3aec7ff..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- * Offset given by a message read from Kafka.
- */
-public class GivenOffset extends KafkaOffset {
-
-       private static final long serialVersionUID = 1L;
-       private final long offset;
-
-       public GivenOffset(long offset) {
-               this.offset = offset;
-       }
-
-       @Override
-       public long getOffset(SimpleConsumer consumer, String topic, int 
partition, String clientName) {
-               return offset;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
deleted file mode 100644
index 2eaa2b8..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- * Superclass for various kinds of KafkaOffsets.
- */
-public abstract class KafkaOffset implements Serializable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffset.class);
-
-       private static final long serialVersionUID = 1L;
-
-       public abstract long getOffset(SimpleConsumer consumer, String topic, 
int partition,
-                       String clientName);
-
-       /**
-        *
-        * @param consumer
-        * @param topic
-        * @param partition
-        * @param whichTime Type of offset request (latest time / earliest time)
-        * @param clientName
-        * @return
-        */
-       protected long getLastOffset(SimpleConsumer consumer, String topic, int 
partition,
-                       long whichTime, String clientName) {
-               TopicAndPartition topicAndPartition = new 
TopicAndPartition(topic, partition);
-               Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo 
= new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-               requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(whichTime, 1));
-
-               kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo,
-                               kafka.api.OffsetRequest.CurrentVersion(), 
clientName);
-               OffsetResponse response = consumer.getOffsetsBefore(request);
-
-               while (response.hasError()) {
-                       int errorCode = response.errorCode(topic, partition);
-                       LOG.warn("Response has error. Error code "+errorCode);
-                       switch (errorCode) {
-                               case 6:
-                               case 3:
-                                       LOG.warn("Kafka broker trying to fetch 
from a non-leader broker.");
-                                       break;
-                               default:
-                                       throw new RuntimeException("Error 
fetching data from Kafka broker. Error code " + errorCode);
-                       }
-
-                       request = new kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), clientName);
-                       response = consumer.getOffsetsBefore(request);
-               }
-
-               long[] offsets = response.offsets(topic, partition);
-               if(offsets.length > 1) {
-                       LOG.warn("The offset response unexpectedly contained 
more than one offset: "+ Arrays.toString(offsets) + " Using only first one");
-               }
-               return offsets[0];
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
deleted file mode 100644
index 02c49df..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-/**
- * Enum controlling the offset behavior of the PersistentKafkaSource.
- */
-public enum Offset {
-       /**
-        * Read the Kafka topology from the beginning
-        */
-       FROM_BEGINNING,
-       /**
-        * Read the topology from the current offset. (Default).
-        */
-       FROM_CURRENT
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 87c6a34..246756c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -25,14 +25,20 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
 import kafka.consumer.ConsumerConfig;
-import org.apache.commons.lang.SerializationUtils;
+import kafka.network.SocketServer;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -40,25 +46,25 @@ import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import 
org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -66,6 +72,7 @@ import org.slf4j.LoggerFactory;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
+import scala.collection.Seq;
 
 /**
  * Code in this test is based on the following GitHub repository:
@@ -91,9 +98,15 @@ public class KafkaITCase {
 
        private static TestingServer zookeeper;
        private static List<KafkaServer> brokers;
+       private static String brokerConnectionStrings = "";
 
        private static boolean shutdownKafkaBroker;
 
+       private static ConsumerConfig standardCC;
+
+       private static ZkClient zkClient;
+
+
        @BeforeClass
        public static void prepare() throws IOException {
                LOG.info("Starting KafkaITCase.prepare()");
@@ -118,6 +131,12 @@ public class KafkaITCase {
                        brokers = new 
ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
                        for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
                                brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
+                               SocketServer socketServer = 
brokers.get(i).socketServer();
+                               String host = "localhost";
+                               if(socketServer.host() != null) {
+                                       host = socketServer.host();
+                               }
+                               brokerConnectionStrings += 
host+":"+socketServer.port()+",";
                        }
 
                        LOG.info("ZK and KafkaServer started.");
@@ -125,6 +144,17 @@ public class KafkaITCase {
                        LOG.warn("Test failed with exception", t);
                        Assert.fail("Test failed with: " + t.getMessage());
                }
+
+               Properties cProps = new Properties();
+               cProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               cProps.setProperty("group.id", "flink-tests");
+               cProps.setProperty("auto.commit.enable", "false");
+
+               cProps.setProperty("auto.offset.reset", "smallest"); // read 
from the beginning.
+
+               standardCC = new ConsumerConfig(cProps);
+
+               zkClient = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new 
PersistentKafkaSource.KafkaZKStringSerializer());
        }
 
        @AfterClass
@@ -142,8 +172,196 @@ public class KafkaITCase {
                                LOG.warn("ZK.stop() failed", e);
                        }
                }
+               zkClient.close();
+       }
+
+
+       @Test
+       public void testOffsetManipulation() {
+               ZkClient zk = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new 
PersistentKafkaSource.KafkaZKStringSerializer());
+
+               final String topicName = "testOffsetManipulation";
+
+               // create topic
+               Properties topicConfig = new Properties();
+               LOG.info("Creating topic {}", topicName);
+               AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+               PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 0, 1337);
+
+               Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, 
standardCC.groupId(), topicName, 0));
+
+               zk.close();
+       }
+       /**
+        * We want to use the High level java consumer API but manage the 
offset in Zookeeper manually.
+        *
+        */
+       @Test
+       public void testPersistentSourceWithOffsetUpdates() throws Exception {
+               LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
+
+               ZkClient zk = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new 
PersistentKafkaSource.KafkaZKStringSerializer());
+
+               final String topicName = "testOffsetHacking";
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(3);
+               env.getConfig().disableSysoutLogging();
+               env.enableCheckpointing(50);
+               env.setNumberOfExecutionRetries(0);
+
+               // create topic
+               Properties topicConfig = new Properties();
+               LOG.info("Creating topic {}", topicName);
+               AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+               // write a sequence from 0 to 99 to each of the three 
partitions.
+               writeSequence(env, topicName, 0, 99);
+
+               readSequence(env, standardCC, topicName, 0, 100, 300);
+
+               // check offsets
+               Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+               Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
+               Assert.assertEquals("The offset seems incorrect", 99L, 
PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));
+
+
+               LOG.info("Manipulating offsets");
+               // set the offset to 25, 50, and 75 for the three partitions
+               PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 0, 50);
+               PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 1, 50);
+               PersistentKafkaSource.setOffset(zk, standardCC.groupId(), 
topicName, 2, 50);
+
+               // create new env
+               env = StreamExecutionEnvironment.createLocalEnvironment(3);
+               env.getConfig().disableSysoutLogging();
+               readSequence(env, standardCC, topicName, 50, 50, 150);
+
+               zk.close();
+
+               LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
+       }
+
+       private void readSequence(StreamExecutionEnvironment env, 
ConsumerConfig cc, String topicName, final int valuesStartFrom, final int 
valuesCount, final int finalCount) throws Exception {
+               LOG.info("Reading sequence for verification until final count 
{}", finalCount);
+               DataStream<Tuple2<Integer, Integer>> source = env.addSource(
+                               new PersistentKafkaSource<Tuple2<Integer, 
Integer>>(topicName, new 
Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new 
Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
+               )
+               //add a sleeper mapper. Since there is no good way of "shutting 
down" a running topology, we have
+               // to play this trick. The problem is that we have to wait 
until all checkpoints are confirmed
+               .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>>() {
+                       @Override
+                       public Tuple2<Integer, Integer> map(Tuple2<Integer, 
Integer> value) throws Exception {
+                               Thread.sleep(75);
+                               return value;
+                       }
+               }).setParallelism(3);
+
+               // verify data
+               DataStream<Integer> validIndexes = source.flatMap(new 
RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+                       int[] values = new int[valuesCount];
+                       int count = 0;
+
+                       @Override
+                       public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Integer> out) throws Exception {
+                               values[value.f1 - valuesStartFrom]++;
+                               count++;
+
+                               LOG.info("Reader " + 
getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + 
count + "/" + finalCount);
+                               // verify if we've seen everything
+
+                               if (count == finalCount) {
+                                       LOG.info("Received all values");
+                                       for (int i = 0; i < values.length; i++) 
{
+                                               int v = values[i];
+                                               if (v != 3) {
+                                                       throw new 
RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " 
array=" + Arrays.toString(values));
+                                               }
+                                       }
+                                       // test has passed
+                                       throw new SuccessException();
+                               }
+                       }
+
+               }).setParallelism(1);
+
+               tryExecute(env, "Read data from Kafka");
+
+               LOG.info("Successfully read sequence for verification");
+       }
+
+
+
+       private void writeSequence(StreamExecutionEnvironment env, String 
topicName, final int from, final int to) throws Exception {
+               LOG.info("Writing sequence from {} to {} to topic {}", from, 
to, topicName);
+               DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new 
RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+                       boolean running = true;
+
+                       @Override
+                       public void run(Collector<Tuple2<Integer, Integer>> 
collector) throws Exception {
+                               LOG.info("Starting source.");
+                               int cnt = from;
+                               int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+                               while (running) {
+                                       LOG.info("Writing " + cnt + " to 
partition " + partition);
+                                       collector.collect(new Tuple2<Integer, 
Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
+                                       if (cnt == to) {
+                                               LOG.info("Writer reached end.");
+                                               return;
+                                       }
+                                       cnt++;
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               LOG.info("Source got cancel()");
+                               running = false;
+                       }
+               }).setParallelism(3);
+               stream.addSink(new KafkaSink<Tuple2<Integer, 
Integer>>(brokerConnectionStrings,
+                               topicName,
+                               new 
Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new 
Tuple2<Integer, Integer>(1, 1), env.getConfig()),
+                               new T2Partitioner()
+               )).setParallelism(3);
+               env.execute("Write sequence from " + from + " to " + to + " to 
topic " + topicName);
+               LOG.info("Finished writing sequence");
+       }
+
+       private static class T2Partitioner implements 
SerializableKafkaPartitioner {
+               @Override
+               public int partition(Object key, int numPartitions) {
+                       if(numPartitions != 3) {
+                               throw new IllegalArgumentException("Expected 
three partitions");
+                       }
+                       Tuple2<Integer, Integer> element = (Tuple2<Integer, 
Integer>) key;
+                       return element.f0;
+               }
+       }
+
+       public static void tryExecute(StreamExecutionEnvironment see, String 
name) throws Exception {
+               try {
+                       see.execute(name);
+               } catch (JobExecutionException good) {
+                       Throwable t = good.getCause();
+                       int limit = 0;
+                       while (!(t instanceof SuccessException)) {
+                               if(t == null) {
+                                       LOG.warn("Test failed with exception", 
good);
+                                       Assert.fail("Test failed with: " + 
good.getMessage());
+                               }
+                               
+                               t = t.getCause();
+                               if (limit++ == 20) {
+                                       LOG.warn("Test failed with exception", 
good);
+                                       Assert.fail("Test failed with: " + 
good.getMessage());
+                               }
+                       }
+               }
        }
 
+
        @Test
        public void regularKafkaSourceTest() throws Exception {
                LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
@@ -152,10 +370,9 @@ public class KafkaITCase {
                createTestTopic(topic, 1, 1);
 
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
-
                // add consuming topology:
                DataStreamSource<Tuple2<Long, String>> consuming = 
env.addSource(
-                               new KafkaSource<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, "myFlinkGroup", new 
TupleSerializationSchema(), 5000));
+                               new KafkaSource<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, "myFlinkGroup", new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, 
String>(1L, ""), env.getConfig()), 5000));
                consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
                        int elCnt = 0;
                        int start = -1;
@@ -210,22 +427,9 @@ public class KafkaITCase {
                                running = false;
                        }
                });
-               stream.addSink(new KafkaSink<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+               stream.addSink(new KafkaSink<Tuple2<Long, 
String>>(brokerConnectionStrings, topic, new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, 
String>(1L, ""), env.getConfig())));
 
-               try {
-                       env.setParallelism(1);
-                       env.execute();
-               } catch (JobExecutionException good) {
-                       Throwable t = good.getCause();
-                       int limit = 0;
-                       while (!(t instanceof SuccessException)) {
-                               t = t.getCause();
-                               if (limit++ == 20) {
-                                       LOG.warn("Test failed with exception", 
good);
-                                       Assert.fail("Test failed with: " + 
good.getMessage());
-                               }
-                       }
-               }
+               tryExecute(env, "regular kafka source test");
 
                LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
        }
@@ -241,7 +445,10 @@ public class KafkaITCase {
 
                // add consuming topology:
                DataStreamSource<Tuple2<Long, String>> consuming = 
env.addSource(
-                               new PersistentKafkaSource<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 
5000, 100, Offset.FROM_BEGINNING));
+                               new PersistentKafkaSource<Tuple2<Long, 
String>>(topic,
+                                               new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, 
String>(1L, ""), env.getConfig()),
+                                               standardCC
+                               ));
                consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
                        int elCnt = 0;
                        int start = -1;
@@ -304,22 +511,9 @@ public class KafkaITCase {
                                running = false;
                        }
                });
-               stream.addSink(new KafkaSink<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+               stream.addSink(new KafkaSink<Tuple2<Long, 
String>>(brokerConnectionStrings, topic, new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, 
String>(1L, ""), env.getConfig())));
 
-               try {
-                       env.setParallelism(1);
-                       env.execute();
-               } catch (JobExecutionException good) {
-                       Throwable t = good.getCause();
-                       int limit = 0;
-                       while (!(t instanceof SuccessException)) {
-                               t = t.getCause();
-                               if (limit++ == 20) {
-                                       LOG.warn("Test failed with exception", 
good);
-                                       Assert.fail("Test failed with: " + 
good.getMessage());
-                               }
-                       }
-               }
+               tryExecute(env, "tupletesttopology");
 
                LOG.info("Finished KafkaITCase.tupleTestTopology()");
        }
@@ -347,16 +541,19 @@ public class KafkaITCase {
                consumerProps.setProperty("fetch.message.max.bytes", 
Integer.toString(1024 * 1024 * 30));
                consumerProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                consumerProps.setProperty("group.id", "test");
+               consumerProps.setProperty("auto.commit.enable", "false");
+               consumerProps.setProperty("auto.offset.reset", "smallest");
 
                ConsumerConfig cc = new ConsumerConfig(consumerProps);
                DataStreamSource<Tuple2<Long, byte[]>> consuming = 
env.addSource(
-                               new PersistentKafkaSource<Tuple2<Long, 
byte[]>>(topic, serSchema, Offset.FROM_BEGINNING, cc));
+                               new PersistentKafkaSource<Tuple2<Long, 
byte[]>>(topic, serSchema, cc));
 
                consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
                        int elCnt = 0;
 
                        @Override
                        public void invoke(Tuple2<Long, byte[]> value) throws 
Exception {
+                               LOG.info("Received {}", value.f0);
                                elCnt++;
                                if(value.f0 == -1) {
                                        // we should have seen 11 elements now.
@@ -370,7 +567,7 @@ public class KafkaITCase {
                                        throw new RuntimeException("More than 
10 elements seen: "+elCnt);
                                }
                        }
-               });
+               }).setParallelism(1);
 
                // add producing topology
                DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new 
RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -398,6 +595,7 @@ public class KafkaITCase {
                                        } catch (InterruptedException ignored) {
                                        }
                                        if(cnt == 10) {
+                                               LOG.info("Send end signal");
                                                // signal end
                                                collector.collect(new 
Tuple2<Long, byte[]>(-1L, new byte[]{1}));
                                                running = false;
@@ -412,31 +610,16 @@ public class KafkaITCase {
                        }
                });
 
-               stream.addSink(new KafkaSink<Tuple2<Long, 
byte[]>>(zookeeperConnectionString, topic,
+               stream.addSink(new KafkaSink<Tuple2<Long, 
byte[]>>(brokerConnectionStrings, topic,
                                new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, 
byte[]>(0L, new byte[]{0}), env.getConfig()))
                );
 
-               try {
-                       env.setParallelism(1);
-                       env.execute();
-               } catch (JobExecutionException good) {
-                       Throwable t = good.getCause();
-                       int limit = 0;
-                       while (!(t instanceof SuccessException)) {
-                               t = t.getCause();
-                               if (limit++ == 20) {
-                                       LOG.warn("Test failed with exception", 
good);
-                                       Assert.fail("Test failed with: " + 
good.getMessage());
-                               }
-                       }
-               }
+               tryExecute(env, "big topology test");
 
                LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
        }
 
 
-       private static boolean partitionerHasBeenCalled = false;
-
        @Test
        public void customPartitioningTestTopology() throws Exception {
                LOG.info("Starting 
KafkaITCase.customPartitioningTestTopology()");
@@ -449,7 +632,9 @@ public class KafkaITCase {
 
                // add consuming topology:
                DataStreamSource<Tuple2<Long, String>> consuming = 
env.addSource(
-                               new PersistentKafkaSource<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 
5000, 100, Offset.FROM_BEGINNING));
+                               new PersistentKafkaSource<Tuple2<Long, 
String>>(topic,
+                                               new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, 
String>(1L, ""), env.getConfig()),
+                                               standardCC));
                consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
                        int start = -1;
                        BitSet validator = new BitSet(101);
@@ -519,23 +704,9 @@ public class KafkaITCase {
                                running = false;
                        }
                });
-               stream.addSink(new KafkaSink<Tuple2<Long, 
String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), new 
CustomPartitioner()));
-
-               try {
-                       env.setParallelism(1);
-                       env.execute();
-               } catch (JobExecutionException good) {
-                       Throwable t = good.getCause();
-                       int limit = 0;
-                       while (!(t instanceof SuccessException)) {
-                               t = t.getCause();
-                               if (limit++ == 20) {
-                                       throw good;
-                               }
-                       }
+               stream.addSink(new KafkaSink<Tuple2<Long, 
String>>(brokerConnectionStrings, topic, new 
Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, 
String>(1L, ""), env.getConfig()), new CustomPartitioner()));
 
-                       assertTrue(partitionerHasBeenCalled);
-               }
+               tryExecute(env, "custom partitioning test");
 
                LOG.info("Finished 
KafkaITCase.customPartitioningTestTopology()");
        }
@@ -547,7 +718,6 @@ public class KafkaITCase {
 
                @Override
                public int partition(Object key, int numPartitions) {
-                       partitionerHasBeenCalled = true;
 
                        @SuppressWarnings("unchecked")
                        Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
@@ -561,25 +731,6 @@ public class KafkaITCase {
                }
        }
 
-       private static class TupleSerializationSchema implements 
DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, 
String>, byte[]> {
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public Tuple2<Long, String> deserialize(byte[] message) {
-                       Object deserializedObject = 
SerializationUtils.deserialize(message);
-                       return (Tuple2<Long, String>) deserializedObject;
-               }
-
-               @Override
-               public byte[] serialize(Tuple2<Long, String> element) {
-                       return SerializationUtils.serialize(element);
-               }
-
-               @Override
-               public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
-                       return false;
-               }
-       }
 
        @Test
        public void simpleTestTopology() throws Exception {
@@ -591,7 +742,7 @@ public class KafkaITCase {
 
                // add consuming topology:
                DataStreamSource<String> consuming = env.addSource(
-                               new 
PersistentKafkaSource<String>(zookeeperConnectionString, topic, new 
JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
+                               new PersistentKafkaSource<String>(topic, new 
JavaDefaultStringSchema(), standardCC));
                consuming.addSink(new SinkFunction<String>() {
                        int elCnt = 0;
                        int start = -1;
@@ -643,34 +794,34 @@ public class KafkaITCase {
                                running = false;
                        }
                });
-               stream.addSink(new KafkaSink<String>(zookeeperConnectionString, 
topic, new JavaDefaultStringSchema()));
+               stream.addSink(new KafkaSink<String>(brokerConnectionStrings, 
topic, new JavaDefaultStringSchema()));
 
-               try {
-                       env.setParallelism(1);
-                       env.execute();
-               } catch (JobExecutionException good) {
-                       Throwable t = good.getCause();
-                       int limit = 0;
-                       while (!(t instanceof SuccessException)) {
-                               t = t.getCause();
-                               if (limit++ == 20) {
-                                       LOG.warn("Test failed with exception", 
good);
-                                       Assert.fail("Test failed with: " + 
good.getMessage());
-                               }
-                       }
-               }
+               tryExecute(env, "simpletest");
        }
 
        private static boolean leaderHasShutDown = false;
 
-       @Test
+       @Test(timeout=60000)
        public void brokerFailureTest() throws Exception {
                String topic = "brokerFailureTestTopic";
 
                createTestTopic(topic, 2, 2);
 
-               KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
-               final String leaderToShutDown = 
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 
0).leader().get().connectionString();
+       //      KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
+       //      final String leaderToShutDown = 
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 
0).leader().get().connectionString();
+
+               PartitionMetadata firstPart = null;
+               do {
+                       if(firstPart != null) {
+                               LOG.info("Unable to find leader. error code 
{}", firstPart.errorCode());
+                               // not the first try. Sleep a bit
+                               Thread.sleep(150);
+                       }
+                       Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+                       firstPart = partitionMetadata.head();
+               } while(firstPart.errorCode() != 0);
+
+               final String leaderToShutDown = 
firstPart.leader().get().connectionString();
 
                final Thread brokerShutdown = new Thread(new Runnable() {
                        @Override
@@ -704,7 +855,7 @@ public class KafkaITCase {
 
                // add consuming topology:
                DataStreamSource<String> consuming = env.addSource(
-                               new 
PersistentKafkaSource<String>(zookeeperConnectionString, topic, new 
JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING));
+                               new PersistentKafkaSource<String>(topic, new 
JavaDefaultStringSchema(), standardCC));
                consuming.setParallelism(1);
 
                consuming.addSink(new SinkFunction<String>() {
@@ -717,7 +868,7 @@ public class KafkaITCase {
 
                        @Override
                        public void invoke(String value) throws Exception {
-                               LOG.info("Got message = " + value + " leader 
has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ 
numOfMessagesToBeCorrect);
+                               LOG.info("Got message = " + value + " leader 
has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + 
numOfMessagesToBeCorrect);
                                String[] sp = value.split("-");
                                int v = Integer.parseInt(sp[1]);
 
@@ -736,8 +887,8 @@ public class KafkaITCase {
                                        shutdownKafkaBroker = true;
                                }
 
-                               if(leaderHasShutDown) { // it only makes sence 
to check once the shutdown is completed
-                                       if (elCnt >= stopAfterMessages ) {
+                               if (leaderHasShutDown) { // it only makes sence 
to check once the shutdown is completed
+                                       if (elCnt >= stopAfterMessages) {
                                                // check if everything in the 
bitset is set to true
                                                int nc;
                                                if ((nc = 
validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
@@ -779,29 +930,18 @@ public class KafkaITCase {
                                running = false;
                        }
                });
-               stream.addSink(new KafkaSink<String>(zookeeperConnectionString, 
topic, new JavaDefaultStringSchema()))
+               stream.addSink(new KafkaSink<String>(brokerConnectionStrings, 
topic, new JavaDefaultStringSchema()))
                                .setParallelism(1);
 
-               try {
-                       env.setParallelism(1);
-                       env.execute();
-               } catch (JobExecutionException good) {
-                       Throwable t = good.getCause();
-                       int limit = 0;
-                       while (!(t instanceof SuccessException)) {
-                               t = t.getCause();
-                               if (limit++ == 20) {
-                                       LOG.warn("Test failed with exception", 
good);
-                                       Assert.fail("Test failed with: " + 
good.getMessage());
-                               }
-                       }
-               }
+               tryExecute(env, "broker failure test");
        }
 
 
        private void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor) {
-               KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
-               kafkaTopicUtils.createTopic(topic, numberOfPartitions, 
replicationFactor);
+               // create topic
+               Properties topicConfig = new Properties();
+               LOG.info("Creating topic {}", topic);
+               AdminUtils.createTopic(zkClient, topic, numberOfPartitions, 
replicationFactor, topicConfig);
        }
 
        private static TestingServer getZookeeper() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
deleted file mode 100644
index 5f0e198..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.PartitionMetadata;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-public class KafkaTopicUtilsTest {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
-       private static final int NUMBER_OF_BROKERS = 2;
-       private static final String TOPIC = "myTopic";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Test
-       public void test() {
-               int zkPort;
-               String kafkaHost;
-               String zookeeperConnectionString;
-
-               File tmpZkDir;
-               List<File> tmpKafkaDirs;
-               Map<String, KafkaServer> kafkaServers = null;
-               TestingServer zookeeper = null;
-
-               try {
-                       tmpZkDir = tempFolder.newFolder();
-
-                       tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS);
-                       for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
-                               tmpKafkaDirs.add(tempFolder.newFolder());
-                       }
-
-                       zkPort = NetUtils.getAvailablePort();
-                       kafkaHost = InetAddress.getLocalHost().getHostName();
-                       zookeeperConnectionString = "localhost:" + zkPort;
-
-                       // init zookeeper
-                       zookeeper = new TestingServer(zkPort, tmpZkDir);
-
-                       // init kafka kafkaServers
-                       kafkaServers = new HashMap<String, KafkaServer>();
-
-                       for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
-                               KafkaServer kafkaServer = 
getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i));
-                               
kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + 
kafkaServer.config().advertisedPort(), kafkaServer);
-                       }
-
-                       // create Kafka topic
-                       final KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
-                       kafkaTopicUtils.createTopic(TOPIC, 1, 2);
-
-                       // check whether topic exists
-                       assertTrue(kafkaTopicUtils.topicExists(TOPIC));
-
-                       // check number of partitions
-                       assertEquals(1, 
kafkaTopicUtils.getNumberOfPartitions(TOPIC));
-
-                       // get partition metadata without error
-                       PartitionMetadata partitionMetadata = 
kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0);
-                       assertEquals(0, partitionMetadata.errorCode());
-
-                       // get broker list
-                       assertEquals(new 
HashSet<String>(kafkaServers.keySet()), 
kafkaTopicUtils.getBrokerAddresses(TOPIC));
-               } catch (IOException e) {
-                       fail(e.toString());
-               } catch (Exception e) {
-                       fail(e.toString());
-               } finally {
-                       LOG.info("Shutting down all services");
-                       for (KafkaServer broker : kafkaServers.values()) {
-                               if (broker != null) {
-                                       broker.shutdown();
-                               }
-                       }
-
-                       if (zookeeper != null) {
-                               try {
-                                       zookeeper.stop();
-                               } catch (IOException e) {
-                                       LOG.warn("ZK.stop() failed", e);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-        */
-       private static KafkaServer getKafkaServer(String kafkaHost, String 
zookeeperConnectionString, int brokerId, File tmpFolder) throws 
UnknownHostException {
-               Properties kafkaProperties = new Properties();
-
-               int kafkaPort = NetUtils.getAvailablePort();
-
-               // properties have to be Strings
-               kafkaProperties.put("advertised.host.name", kafkaHost);
-               kafkaProperties.put("port", Integer.toString(kafkaPort));
-               kafkaProperties.put("broker.id", Integer.toString(brokerId));
-               kafkaProperties.put("log.dir", tmpFolder.toString());
-               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
-               KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-               KafkaServer server = new KafkaServer(kafkaConfig, new 
KafkaLocalSystemTime());
-               server.startup();
-               return server;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
index dc20726..9ede613 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 196f7ec..4bd89c4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
+import java.io.Serializable;
+
 /**
  * This interface marks a function/operator as <i>asynchronously 
checkpointed</i>.
  * Similar to the {@link Checkpointed} interface, the function must produce a
@@ -32,4 +34,4 @@ package org.apache.flink.streaming.api.checkpoint;
  * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
  * of the actual state.</p>
  */
-public interface CheckpointedAsynchronously extends Checkpointed {}
+public interface CheckpointedAsynchronously<T extends Serializable> extends 
Checkpointed<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index a59407c..3efad93 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -30,8 +30,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         */
        @Override
        public JobExecutionResult execute() throws Exception {
-               return 
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(),
-                                                                               
                                                
getConfig().isSysoutLoggingEnabled());
+               return execute(DEFAULT_JOB_NAME);
        }
 
        /**
@@ -44,7 +43,9 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         */
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
-               return 
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), 
getParallelism(),
-                                                                               
                                                                
getConfig().isSysoutLoggingEnabled());
+               JobExecutionResult result = 
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), 
getParallelism(),
+                               getConfig().isSysoutLoggingEnabled());
+               streamGraph.clear(); // clear graph to allow submitting another 
job via the same environment.
+               return result;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b0471f9..2e33b82 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -49,7 +50,6 @@ import 
org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
-import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
@@ -67,6 +67,8 @@ import com.esotericsoftware.kryo.Serializer;
  */
 public abstract class StreamExecutionEnvironment {
 
+       public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
+
        private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
 
        private long bufferTimeout = 100;
@@ -624,8 +626,8 @@ public abstract class StreamExecutionEnvironment {
 
                TypeInformation<OUT> outTypeInfo;
 
-               if (function instanceof GenericSourceFunction) {
-                       outTypeInfo = ((GenericSourceFunction<OUT>) 
function).getType();
+               if (function instanceof ResultTypeQueryable) {
+                       outTypeInfo = ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                } else {
                        try {
                                outTypeInfo = 
TypeExtractor.createTypeInfo(SourceFunction.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
deleted file mode 100644
index 0113cfe..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public interface GenericSourceFunction<T> {
-
-       TypeInformation<T> getType();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 93bf8eb..271c05c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -60,8 +60,7 @@ import org.slf4j.LoggerFactory;
 public class StreamGraph extends StreamingPlan {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraph.class);
-       private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
-       private String jobName = DEAFULT_JOB_NAME;
+       private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
 
        private final StreamExecutionEnvironment environemnt;
        private final ExecutionConfig executionConfig;
@@ -70,17 +69,25 @@ public class StreamGraph extends StreamingPlan {
        private long checkpointingInterval = 5000;
        private boolean chaining = true;
 
-       private final Map<Integer, StreamNode> streamNodes;
-       private final Set<Integer> sources;
+       private Map<Integer, StreamNode> streamNodes;
+       private Set<Integer> sources;
 
-       private final Map<Integer, StreamLoop> streamLoops;
-       protected final Map<Integer, StreamLoop> vertexIDtoLoop;
+       private Map<Integer, StreamLoop> streamLoops;
+       protected Map<Integer, StreamLoop> vertexIDtoLoop;
 
        public StreamGraph(StreamExecutionEnvironment environment) {
 
                this.environemnt = environment;
                executionConfig = environment.getConfig();
 
+               // create an empty new stream graph.
+               clear();
+       }
+
+       /**
+        * Remove all registered nodes etc.
+        */
+       public void clear() {
                streamNodes = new HashMap<Integer, StreamNode>();
                streamLoops = new HashMap<Integer, StreamLoop>();
                vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f66e394..24a08eb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,8 +137,7 @@ public abstract class StreamOperator<IN, OUT> implements 
Serializable {
                        callUserFunction();
                } catch (Exception e) {
                        if (LOG.isErrorEnabled()) {
-                               LOG.error("Calling user function failed due to: 
{}",
-                                               
StringUtils.stringifyException(e));
+                               LOG.error("Calling user function failed", e);
                        }
                        throw new RuntimeException(e);
                }
@@ -168,7 +166,7 @@ public abstract class StreamOperator<IN, OUT> implements 
Serializable {
                try {
                        FunctionUtils.closeFunction(userFunction);
                } catch (Exception e) {
-                       throw new RuntimeException("Error when closing the 
function: " + e.getMessage());
+                       throw new RuntimeException("Error when closing the 
function", e);
                }
        }
 
@@ -187,8 +185,7 @@ public abstract class StreamOperator<IN, OUT> implements 
Serializable {
        public void setChainingStrategy(ChainingStrategy strategy) {
                if (strategy == ChainingStrategy.ALWAYS) {
                        if (!(this instanceof ChainableStreamOperator)) {
-                               throw new RuntimeException(
-                                               "Operator needs to extend 
ChainableOperator to be chained");
+                               throw new RuntimeException("Operator needs to 
extend ChainableOperator to be chained");
                        }
                }
                this.chainingStrategy = strategy;

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
deleted file mode 100644
index 4dd4b45..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Base class for representing operator states that can be repartitioned for
- * state state and load balancing.
- * 
- * @param <T>
- *            The type of the operator state.
- */
-public abstract class PartitionableState<T> extends OperatorState<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       PartitionableState(T initialState) {
-               super(initialState);
-       }
-
-       /**
-        * Repartitions(divides) the current state into the given number of new
-        * partitions. The created partitions will be used to redistribute then
-        * rebuild the state among the parallel instances of the operator. The
-        * implementation should reflect the partitioning of the input values to
-        * maintain correct operator behavior.
-        * 
-        * </br> </br> It is also assumed that if we would {@link #reBuild} the
-        * repartitioned state we would basically get the same as before.
-        * 
-        * 
-        * @param numberOfPartitions
-        *            The desired number of partitions. The method must return 
an
-        *            array of that size.
-        * @return The array containing the state part for each partition.
-        */
-       public abstract OperatorState<T>[] repartition(int numberOfPartitions);
-
-       /**
-        * Rebuilds the current state partition from the given parts. Used for
-        * building the state after a re-balance phase.
-        * 
-        * @param parts
-        *            The state parts that will be used to rebuild the current
-        *            partition.
-        * @return The rebuilt operator state.
-        */
-       public abstract OperatorState<T> reBuild(OperatorState<T>... parts);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 930c9b4..0259568 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -273,7 +273,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable 
implements StreamTask
                synchronized (checkpointLock) {
                        if (isRunning) {
                                try {
-                                       LOG.info("Starting checkpoint " + 
checkpointId);
+                                       LOG.info("Starting checkpoint {} on 
task {}", checkpointId, getName());
                                        
                                        // first draw the state that should go 
into checkpoint
                                        LocalStateHandle state;
@@ -282,7 +282,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable 
implements StreamTask
                                                state = userState == null ? 
null : new LocalStateHandle(userState);
                                        }
                                        catch (Exception e) {
-                                               throw new Exception("Error 
while drawing snapshot of the user state.");
+                                               throw new Exception("Error 
while drawing snapshot of the user state.", e);
                                        }
                        
                                        // now emit the checkpoint barriers
@@ -333,8 +333,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable 
implements StreamTask
                                triggerCheckpoint(sStep.getId(), 
sStep.getTimestamp());
                        }
                        catch (Exception e) {
-                               throw new RuntimeException(
-                                               "Error triggering a checkpoint 
as the result of receiving checkpoint barrier", e);
+                               throw new RuntimeException("Error triggering a 
checkpoint as the result of receiving checkpoint barrier", e);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index faaa79b..87c9757 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
 import java.io.Serializable;
 
-public interface DeserializationSchema<T> extends Serializable {
+public interface DeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
 
        /**
         * Deserializes the incoming data.

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
index 93d13ab..a4b1419 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 public class JavaDefaultStringSchema implements DeserializationSchema<String>, 
SerializationSchema<String, byte[]> {
 
@@ -38,4 +40,9 @@ public class JavaDefaultStringSchema implements 
DeserializationSchema<String>, S
                return SerializationUtils.deserialize(message);
        }
 
+       @Override
+       public TypeInformation<String> getProducedType() {
+               return TypeExtractor.getForClass(String.class);
+       }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
index e457bef..9c5885f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 public class RawSchema implements DeserializationSchema<byte[]>,
                SerializationSchema<byte[], byte[]> {
 
@@ -36,4 +39,9 @@ public class RawSchema implements 
DeserializationSchema<byte[]>,
        public byte[] serialize(byte[] element) {
                return element;
        }
+
+       @Override
+       public TypeInformation<byte[]> getProducedType() {
+               return TypeExtractor.getForClass(byte[].class);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 3d0a0d5..7c5946d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 public class SimpleStringSchema implements DeserializationSchema<String>,
                SerializationSchema<String, String> {
 
@@ -37,4 +40,8 @@ public class SimpleStringSchema implements 
DeserializationSchema<String>,
                return element;
        }
 
+       @Override
+       public TypeInformation<String> getProducedType() {
+               return TypeExtractor.getForClass(String.class);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
deleted file mode 100644
index 136a091..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class OperatorStateTest {
-
-       @Test
-       public void testOperatorState() {
-               OperatorState<Integer> os = new OperatorState<Integer>(5);
-
-               StateCheckpoint<Integer> scp = os.checkpoint();
-
-               assertTrue(os.stateEquals(scp.restore()));
-
-               assertEquals((Integer) 5, os.getState());
-
-               os.update(10);
-
-               assertEquals((Integer) 10, os.getState());
-       }
-
-}

Reply via email to