http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java deleted file mode 100644 index bf3b3f4..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple.iterator; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import kafka.consumer.ConsumerConfig; -import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; -import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; -import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset; -import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset; -import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.cluster.Broker; -import kafka.common.ErrorMapping; -import kafka.common.NotLeaderForPartitionException; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; - -/** - * Iterates the records received from a partition of a Kafka topic as byte arrays. - * - * This code is in parts based on https://gist.github.com/ashrithr/5811266. - */ -public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class); - - private List<String> hosts; - private String topic; - private int partition; - private long readOffset; - private transient SimpleConsumer consumer; - private List<String> replicaBrokers; - private String clientName; - private Broker leadBroker; - private final ConsumerConfig consumerConfig; - - private KafkaOffset initialOffset; - private transient Iterator<MessageAndOffset> iter; - private transient FetchResponse fetchResponse; - - /** - * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service - * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers. - * - * @param topic - * Name of the topic to listen to - * @param partition - * Partition in the chosen topic - * @param initialOffset - * Offset to start consuming at - * @param kafkaTopicUtils - * Util for receiving topic metadata - */ - public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset, - KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) { - - Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition); - this.hosts = new ArrayList<String>(brokerAddresses); - - this.consumerConfig = consumerConfig; - this.topic = topic; - this.partition = partition; - - this.initialOffset = initialOffset; - - this.replicaBrokers = new ArrayList<String>(); - } - - // -------------------------------------------------------------------------------------------- - // Initializing a connection - // -------------------------------------------------------------------------------------------- - - /** - * Initializes the connection by detecting the leading broker of - * the topic and establishing a connection to it. - */ - public void initialize() { - if (LOG.isInfoEnabled()) { - LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts); - } - - PartitionMetadata metadata = getPartitionMetadata(); - - leadBroker = metadata.leader(); - clientName = "Client_" + topic + "_" + partition; - - consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName); - - try { - readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); - } catch (NotLeaderForPartitionException e) { - throw new RuntimeException("Unable to get offset",e); - } - - try { - resetFetchResponse(readOffset); - } catch (ClosedChannelException e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Got ClosedChannelException, trying to find new leader."); - } - findNewLeader(); - } - } - - private PartitionMetadata getPartitionMetadata() { - PartitionMetadata metadata; - int retry = 0; - int waitTime = consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY, PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT); - do { - metadata = findLeader(hosts, topic, partition); - /*try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException("Establishing connection to Kafka failed", e); - } */ - if(metadata == null) { - retry++; - if(retry == consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY, PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) { - throw new RuntimeException("Tried finding a leader "+retry+" times without success"); - } - LOG.warn("Unable to get leader and partition metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1); - try { - Thread.sleep(waitTime); - } catch (InterruptedException e) { - throw new RuntimeException("Establishing connection to Kafka failed", e); - } - } - } while (metadata == null); - - if (metadata.leader() == null) { - throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")"); - } - - return metadata; - } - - /** - * Sets the partition to read from. - * - * @param partition - * partition number - */ - public void setPartition(int partition) { - this.partition = partition; - } - - // -------------------------------------------------------------------------------------------- - // Iterator methods - // -------------------------------------------------------------------------------------------- - - /** - * Convenience method to emulate iterator behaviour. - * - * @return whether the iterator has a next element - */ - public boolean hasNext() { - return true; - } - - /** - * Returns the next message received from Kafka as a - * byte array. - * - * @return next message as a byte array. - */ - public byte[] next() { - return nextWithOffset().getMessage(); - } - - public boolean fetchHasNext() { - synchronized (fetchResponse) { - if (!iter.hasNext()) { - try { - resetFetchResponse(readOffset); - } catch (ClosedChannelException e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Got ClosedChannelException, trying to find new leader.", e); - } - findNewLeader(); - } - return iter.hasNext(); - } else { - return true; - } - } - } - - /** - * Returns the next message and its offset received from - * Kafka encapsulated in a POJO. - * - * @return next message and its offset. - */ - public MessageWithMetadata nextWithOffset() { - - synchronized (fetchResponse) { - if (!iter.hasNext()) { - throw new RuntimeException( - "Trying to read when response is not fetched. Call fetchHasNext() first."); - } - - MessageAndOffset messageAndOffset = iter.next(); - long currentOffset = messageAndOffset.offset(); - - while (currentOffset < readOffset) { - messageAndOffset = iter.next(); - currentOffset = messageAndOffset.offset(); - } - - readOffset = messageAndOffset.nextOffset(); - ByteBuffer payload = messageAndOffset.message().payload(); - - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - - return new MessageWithMetadata(messageAndOffset.offset(), bytes, partition); - } - } - - // -------------------------------------------------------------------------------------------- - // Internal utilities - // -------------------------------------------------------------------------------------------- - - private void resetFetchResponse(long offset) throws ClosedChannelException { - FetchRequest req = new FetchRequestBuilder().clientId(clientName) - .addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build(); - - fetchResponse = consumer.fetch(req); - - if (fetchResponse.hasError()) { - short code = fetchResponse.errorCode(topic, partition); - - if (LOG.isErrorEnabled()) { - LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); - } - - if (code == ErrorMapping.OffsetOutOfRangeCode()) { - if (LOG.isErrorEnabled()) { - LOG.error("Asked for invalid offset {}", offset); - } - String reset = consumerConfig.autoOffsetReset(); - if(reset.equals("smallest")) { - LOG.info("Setting read offset to beginning (smallest)"); - readOffset = new BeginningOffset().getOffset(consumer, topic, partition, clientName); - } else if(reset.equals("largest")) { - LOG.info("Setting read offset to current offset (largest)"); - readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName); - } else { - throw new RuntimeException("Unknown 'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 'largest'."); - } - } - - findNewLeader(); - } - - iter = fetchResponse.messageSet(topic, partition).iterator(); - } - - private void findNewLeader() { - consumer.close(); - consumer = null; - leadBroker = findNewLeader(leadBroker, topic, partition); - consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName); - } - - private PartitionMetadata findLeader(List<String> addresses, String topic, int partition) { - - PartitionMetadata returnMetaData = null; - loop: - for (String address : addresses) { - - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to find leader via broker: {}", address); - } - - String[] split = address.split(":"); - String host = split[0]; - int port = Integer.parseInt(split[1]); - - SimpleConsumer consumer = null; - try { - consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup"); - List<String> topics = Collections.singletonList(topic); - - TopicMetadataRequest req = new TopicMetadataRequest(topics); - - kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); - - List<TopicMetadata> metaData = resp.topicsMetadata(); - for (TopicMetadata item : metaData) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == partition) { - returnMetaData = part; - break loop; - } - } - } - } catch (Exception e) { - if (e instanceof ClosedChannelException) { - LOG.warn("Got ClosedChannelException while trying to communicate with Broker" + - "[{}] to find Leader for [{}, {}]. Trying other replicas.", address, topic, partition); - } else { - throw new RuntimeException("Error communicating with Broker [" + address + "] to find Leader for [" + topic + ", " + partition + "]", e); - } - } finally { - if (consumer != null) { - consumer.close(); - } - } - } - if (returnMetaData != null) { - replicaBrokers.clear(); - for (kafka.cluster.Broker replica : returnMetaData.replicas()) { - replicaBrokers.add(replica.host() + ":" + replica.port()); - } - } - return returnMetaData; - } - - private Broker findNewLeader(Broker oldLeader, String topic, int a_partition) { - for (int i = 0; i < 3; i++) { - if (LOG.isInfoEnabled()) { - LOG.info("Trying to find a new leader after Broker failure."); - } - boolean goToSleep = false; - PartitionMetadata metadata = findLeader(replicaBrokers, topic, a_partition); - if (metadata == null) { - goToSleep = true; - } else if (metadata.leader() == null) { - goToSleep = true; - } else if (oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) { - // first time through if the leader hasn't changed give ZooKeeper a second to recover - // second time, assume the broker did recover before failover, or it was a non-Broker issue - // - goToSleep = true; - } else { - return metadata.leader(); - } - if (goToSleep) { - try { - Thread.sleep(10000); - } catch (InterruptedException ie) { - } - } - } - throw new RuntimeException("Unable to find new leader after Broker failure."); - } - - public int getId() { - return this.partition; - } - - @Override - public String toString() { - return "SinglePartitionIterator{partition="+partition+" readOffset="+readOffset+"}"; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java deleted file mode 100644 index 15e7b36..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple.offset; - -import kafka.api.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; - -public class BeginningOffset extends KafkaOffset { - - private static final long serialVersionUID = 1L; - - @Override - public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { - return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java deleted file mode 100644 index 6119f32..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple.offset; - -import kafka.api.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; - -public class CurrentOffset extends KafkaOffset { - - private static final long serialVersionUID = 1L; - - @Override - public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { - return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java deleted file mode 100644 index 3aec7ff..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple.offset; - -import kafka.javaapi.consumer.SimpleConsumer; - -/** - * Offset given by a message read from Kafka. - */ -public class GivenOffset extends KafkaOffset { - - private static final long serialVersionUID = 1L; - private final long offset; - - public GivenOffset(long offset) { - this.offset = offset; - } - - @Override - public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { - return offset; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java deleted file mode 100644 index 2eaa2b8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple.offset; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; - -/** - * Superclass for various kinds of KafkaOffsets. - */ -public abstract class KafkaOffset implements Serializable { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class); - - private static final long serialVersionUID = 1L; - - public abstract long getOffset(SimpleConsumer consumer, String topic, int partition, - String clientName); - - /** - * - * @param consumer - * @param topic - * @param partition - * @param whichTime Type of offset request (latest time / earliest time) - * @param clientName - * @return - */ - protected long getLastOffset(SimpleConsumer consumer, String topic, int partition, - long whichTime, String clientName) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); - - kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, - kafka.api.OffsetRequest.CurrentVersion(), clientName); - OffsetResponse response = consumer.getOffsetsBefore(request); - - while (response.hasError()) { - int errorCode = response.errorCode(topic, partition); - LOG.warn("Response has error. Error code "+errorCode); - switch (errorCode) { - case 6: - case 3: - LOG.warn("Kafka broker trying to fetch from a non-leader broker."); - break; - default: - throw new RuntimeException("Error fetching data from Kafka broker. Error code " + errorCode); - } - - request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); - response = consumer.getOffsetsBefore(request); - } - - long[] offsets = response.offsets(topic, partition); - if(offsets.length > 1) { - LOG.warn("The offset response unexpectedly contained more than one offset: "+ Arrays.toString(offsets) + " Using only first one"); - } - return offsets[0]; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java deleted file mode 100644 index 02c49df..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple.offset; - -/** - * Enum controlling the offset behavior of the PersistentKafkaSource. - */ -public enum Offset { - /** - * Read the Kafka topology from the beginning - */ - FROM_BEGINNING, - /** - * Read the topology from the current offset. (Default). - */ - FROM_CURRENT -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 87c6a34..246756c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -25,14 +25,20 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.List; import java.util.Properties; import java.util.Random; +import kafka.admin.AdminUtils; +import kafka.api.PartitionMetadata; import kafka.consumer.ConsumerConfig; -import org.apache.commons.lang.SerializationUtils; +import kafka.network.SocketServer; +import org.I0Itec.zkclient.ZkClient; import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; @@ -40,25 +46,25 @@ import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.connectors.kafka.api.KafkaSink; import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; -import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset; +import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource; import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -66,6 +72,7 @@ import org.slf4j.LoggerFactory; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; +import scala.collection.Seq; /** * Code in this test is based on the following GitHub repository: @@ -91,9 +98,15 @@ public class KafkaITCase { private static TestingServer zookeeper; private static List<KafkaServer> brokers; + private static String brokerConnectionStrings = ""; private static boolean shutdownKafkaBroker; + private static ConsumerConfig standardCC; + + private static ZkClient zkClient; + + @BeforeClass public static void prepare() throws IOException { LOG.info("Starting KafkaITCase.prepare()"); @@ -118,6 +131,12 @@ public class KafkaITCase { brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS); for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); + SocketServer socketServer = brokers.get(i).socketServer(); + String host = "localhost"; + if(socketServer.host() != null) { + host = socketServer.host(); + } + brokerConnectionStrings += host+":"+socketServer.port()+","; } LOG.info("ZK and KafkaServer started."); @@ -125,6 +144,17 @@ public class KafkaITCase { LOG.warn("Test failed with exception", t); Assert.fail("Test failed with: " + t.getMessage()); } + + Properties cProps = new Properties(); + cProps.setProperty("zookeeper.connect", zookeeperConnectionString); + cProps.setProperty("group.id", "flink-tests"); + cProps.setProperty("auto.commit.enable", "false"); + + cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. + + standardCC = new ConsumerConfig(cProps); + + zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer()); } @AfterClass @@ -142,8 +172,196 @@ public class KafkaITCase { LOG.warn("ZK.stop() failed", e); } } + zkClient.close(); + } + + + @Test + public void testOffsetManipulation() { + ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer()); + + final String topicName = "testOffsetManipulation"; + + // create topic + Properties topicConfig = new Properties(); + LOG.info("Creating topic {}", topicName); + AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig); + + PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337); + + Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0)); + + zk.close(); + } + /** + * We want to use the High level java consumer API but manage the offset in Zookeeper manually. + * + */ + @Test + public void testPersistentSourceWithOffsetUpdates() throws Exception { + LOG.info("Starting testPersistentSourceWithOffsetUpdates()"); + + ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer()); + + final String topicName = "testOffsetHacking"; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(50); + env.setNumberOfExecutionRetries(0); + + // create topic + Properties topicConfig = new Properties(); + LOG.info("Creating topic {}", topicName); + AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig); + + // write a sequence from 0 to 99 to each of the three partitions. + writeSequence(env, topicName, 0, 99); + + readSequence(env, standardCC, topicName, 0, 100, 300); + + // check offsets + Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0)); + Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1)); + Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2)); + + + LOG.info("Manipulating offsets"); + // set the offset to 25, 50, and 75 for the three partitions + PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50); + PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50); + PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50); + + // create new env + env = StreamExecutionEnvironment.createLocalEnvironment(3); + env.getConfig().disableSysoutLogging(); + readSequence(env, standardCC, topicName, 50, 50, 150); + + zk.close(); + + LOG.info("Finished testPersistentSourceWithOffsetUpdates()"); + } + + private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception { + LOG.info("Reading sequence for verification until final count {}", finalCount); + DataStream<Tuple2<Integer, Integer>> source = env.addSource( + new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc) + ) + //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have + // to play this trick. The problem is that we have to wait until all checkpoints are confirmed + .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { + @Override + public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { + Thread.sleep(75); + return value; + } + }).setParallelism(3); + + // verify data + DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { + int[] values = new int[valuesCount]; + int count = 0; + + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception { + values[value.f1 - valuesStartFrom]++; + count++; + + LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount); + // verify if we've seen everything + + if (count == finalCount) { + LOG.info("Received all values"); + for (int i = 0; i < values.length; i++) { + int v = values[i]; + if (v != 3) { + throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values)); + } + } + // test has passed + throw new SuccessException(); + } + } + + }).setParallelism(1); + + tryExecute(env, "Read data from Kafka"); + + LOG.info("Successfully read sequence for verification"); + } + + + + private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception { + LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName); + DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { + private static final long serialVersionUID = 1L; + boolean running = true; + + @Override + public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception { + LOG.info("Starting source."); + int cnt = from; + int partition = getRuntimeContext().getIndexOfThisSubtask(); + while (running) { + LOG.info("Writing " + cnt + " to partition " + partition); + collector.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); + if (cnt == to) { + LOG.info("Writer reached end."); + return; + } + cnt++; + } + } + + @Override + public void cancel() { + LOG.info("Source got cancel()"); + running = false; + } + }).setParallelism(3); + stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings, + topicName, + new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()), + new T2Partitioner() + )).setParallelism(3); + env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName); + LOG.info("Finished writing sequence"); + } + + private static class T2Partitioner implements SerializableKafkaPartitioner { + @Override + public int partition(Object key, int numPartitions) { + if(numPartitions != 3) { + throw new IllegalArgumentException("Expected three partitions"); + } + Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key; + return element.f0; + } + } + + public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception { + try { + see.execute(name); + } catch (JobExecutionException good) { + Throwable t = good.getCause(); + int limit = 0; + while (!(t instanceof SuccessException)) { + if(t == null) { + LOG.warn("Test failed with exception", good); + Assert.fail("Test failed with: " + good.getMessage()); + } + + t = t.getCause(); + if (limit++ == 20) { + LOG.warn("Test failed with exception", good); + Assert.fail("Test failed with: " + good.getMessage()); + } + } + } } + @Test public void regularKafkaSourceTest() throws Exception { LOG.info("Starting KafkaITCase.regularKafkaSourceTest()"); @@ -152,10 +370,9 @@ public class KafkaITCase { createTestTopic(topic, 1, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - // add consuming topology: DataStreamSource<Tuple2<Long, String>> consuming = env.addSource( - new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new TupleSerializationSchema(), 5000)); + new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000)); consuming.addSink(new SinkFunction<Tuple2<Long, String>>() { int elCnt = 0; int start = -1; @@ -210,22 +427,9 @@ public class KafkaITCase { running = false; } }); - stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema())); + stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()))); - try { - env.setParallelism(1); - env.execute(); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - t = t.getCause(); - if (limit++ == 20) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - } - } + tryExecute(env, "regular kafka source test"); LOG.info("Finished KafkaITCase.regularKafkaSourceTest()"); } @@ -241,7 +445,10 @@ public class KafkaITCase { // add consuming topology: DataStreamSource<Tuple2<Long, String>> consuming = env.addSource( - new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING)); + new PersistentKafkaSource<Tuple2<Long, String>>(topic, + new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), + standardCC + )); consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() { int elCnt = 0; int start = -1; @@ -304,22 +511,9 @@ public class KafkaITCase { running = false; } }); - stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema())); + stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()))); - try { - env.setParallelism(1); - env.execute(); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - t = t.getCause(); - if (limit++ == 20) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - } - } + tryExecute(env, "tupletesttopology"); LOG.info("Finished KafkaITCase.tupleTestTopology()"); } @@ -347,16 +541,19 @@ public class KafkaITCase { consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30)); consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString); consumerProps.setProperty("group.id", "test"); + consumerProps.setProperty("auto.commit.enable", "false"); + consumerProps.setProperty("auto.offset.reset", "smallest"); ConsumerConfig cc = new ConsumerConfig(consumerProps); DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource( - new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, Offset.FROM_BEGINNING, cc)); + new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc)); consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() { int elCnt = 0; @Override public void invoke(Tuple2<Long, byte[]> value) throws Exception { + LOG.info("Received {}", value.f0); elCnt++; if(value.f0 == -1) { // we should have seen 11 elements now. @@ -370,7 +567,7 @@ public class KafkaITCase { throw new RuntimeException("More than 10 elements seen: "+elCnt); } } - }); + }).setParallelism(1); // add producing topology DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() { @@ -398,6 +595,7 @@ public class KafkaITCase { } catch (InterruptedException ignored) { } if(cnt == 10) { + LOG.info("Send end signal"); // signal end collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1})); running = false; @@ -412,31 +610,16 @@ public class KafkaITCase { } }); - stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(zookeeperConnectionString, topic, + stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig())) ); - try { - env.setParallelism(1); - env.execute(); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - t = t.getCause(); - if (limit++ == 20) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - } - } + tryExecute(env, "big topology test"); LOG.info("Finished KafkaITCase.bigRecordTestTopology()"); } - private static boolean partitionerHasBeenCalled = false; - @Test public void customPartitioningTestTopology() throws Exception { LOG.info("Starting KafkaITCase.customPartitioningTestTopology()"); @@ -449,7 +632,9 @@ public class KafkaITCase { // add consuming topology: DataStreamSource<Tuple2<Long, String>> consuming = env.addSource( - new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING)); + new PersistentKafkaSource<Tuple2<Long, String>>(topic, + new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), + standardCC)); consuming.addSink(new SinkFunction<Tuple2<Long, String>>() { int start = -1; BitSet validator = new BitSet(101); @@ -519,23 +704,9 @@ public class KafkaITCase { running = false; } }); - stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), new CustomPartitioner())); - - try { - env.setParallelism(1); - env.execute(); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - t = t.getCause(); - if (limit++ == 20) { - throw good; - } - } + stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner())); - assertTrue(partitionerHasBeenCalled); - } + tryExecute(env, "custom partitioning test"); LOG.info("Finished KafkaITCase.customPartitioningTestTopology()"); } @@ -547,7 +718,6 @@ public class KafkaITCase { @Override public int partition(Object key, int numPartitions) { - partitionerHasBeenCalled = true; @SuppressWarnings("unchecked") Tuple2<Long, String> tuple = (Tuple2<Long, String>) key; @@ -561,25 +731,6 @@ public class KafkaITCase { } } - private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, String>, byte[]> { - - @SuppressWarnings("unchecked") - @Override - public Tuple2<Long, String> deserialize(byte[] message) { - Object deserializedObject = SerializationUtils.deserialize(message); - return (Tuple2<Long, String>) deserializedObject; - } - - @Override - public byte[] serialize(Tuple2<Long, String> element) { - return SerializationUtils.serialize(element); - } - - @Override - public boolean isEndOfStream(Tuple2<Long, String> nextElement) { - return false; - } - } @Test public void simpleTestTopology() throws Exception { @@ -591,7 +742,7 @@ public class KafkaITCase { // add consuming topology: DataStreamSource<String> consuming = env.addSource( - new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING)); + new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC)); consuming.addSink(new SinkFunction<String>() { int elCnt = 0; int start = -1; @@ -643,34 +794,34 @@ public class KafkaITCase { running = false; } }); - stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema())); + stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema())); - try { - env.setParallelism(1); - env.execute(); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - t = t.getCause(); - if (limit++ == 20) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - } - } + tryExecute(env, "simpletest"); } private static boolean leaderHasShutDown = false; - @Test + @Test(timeout=60000) public void brokerFailureTest() throws Exception { String topic = "brokerFailureTestTopic"; createTestTopic(topic, 2, 2); - KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + // KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); + // final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + + PartitionMetadata firstPart = null; + do { + if(firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata(); + firstPart = partitionMetadata.head(); + } while(firstPart.errorCode() != 0); + + final String leaderToShutDown = firstPart.leader().get().connectionString(); final Thread brokerShutdown = new Thread(new Runnable() { @Override @@ -704,7 +855,7 @@ public class KafkaITCase { // add consuming topology: DataStreamSource<String> consuming = env.addSource( - new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING)); + new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC)); consuming.setParallelism(1); consuming.addSink(new SinkFunction<String>() { @@ -717,7 +868,7 @@ public class KafkaITCase { @Override public void invoke(String value) throws Exception { - LOG.info("Got message = " + value + " leader has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ numOfMessagesToBeCorrect); + LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect); String[] sp = value.split("-"); int v = Integer.parseInt(sp[1]); @@ -736,8 +887,8 @@ public class KafkaITCase { shutdownKafkaBroker = true; } - if(leaderHasShutDown) { // it only makes sence to check once the shutdown is completed - if (elCnt >= stopAfterMessages ) { + if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed + if (elCnt >= stopAfterMessages) { // check if everything in the bitset is set to true int nc; if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) { @@ -779,29 +930,18 @@ public class KafkaITCase { running = false; } }); - stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema())) + stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema())) .setParallelism(1); - try { - env.setParallelism(1); - env.execute(); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - t = t.getCause(); - if (limit++ == 20) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - } - } + tryExecute(env, "broker failure test"); } private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { - KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(topic, numberOfPartitions, replicationFactor); + // create topic + Properties topicConfig = new Properties(); + LOG.info("Creating topic {}", topic); + AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig); } private static TestingServer getZookeeper() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java deleted file mode 100644 index 5f0e198..0000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.curator.test.TestingServer; -import org.apache.flink.runtime.net.NetUtils; -import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; -import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.api.PartitionMetadata; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; - -public class KafkaTopicUtilsTest { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class); - private static final int NUMBER_OF_BROKERS = 2; - private static final String TOPIC = "myTopic"; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void test() { - int zkPort; - String kafkaHost; - String zookeeperConnectionString; - - File tmpZkDir; - List<File> tmpKafkaDirs; - Map<String, KafkaServer> kafkaServers = null; - TestingServer zookeeper = null; - - try { - tmpZkDir = tempFolder.newFolder(); - - tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS); - for (int i = 0; i < NUMBER_OF_BROKERS; i++) { - tmpKafkaDirs.add(tempFolder.newFolder()); - } - - zkPort = NetUtils.getAvailablePort(); - kafkaHost = InetAddress.getLocalHost().getHostName(); - zookeeperConnectionString = "localhost:" + zkPort; - - // init zookeeper - zookeeper = new TestingServer(zkPort, tmpZkDir); - - // init kafka kafkaServers - kafkaServers = new HashMap<String, KafkaServer>(); - - for (int i = 0; i < NUMBER_OF_BROKERS; i++) { - KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i)); - kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(), kafkaServer); - } - - // create Kafka topic - final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(TOPIC, 1, 2); - - // check whether topic exists - assertTrue(kafkaTopicUtils.topicExists(TOPIC)); - - // check number of partitions - assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC)); - - // get partition metadata without error - PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0); - assertEquals(0, partitionMetadata.errorCode()); - - // get broker list - assertEquals(new HashSet<String>(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC)); - } catch (IOException e) { - fail(e.toString()); - } catch (Exception e) { - fail(e.toString()); - } finally { - LOG.info("Shutting down all services"); - for (KafkaServer broker : kafkaServers.values()) { - if (broker != null) { - broker.shutdown(); - } - } - - if (zookeeper != null) { - try { - zookeeper.stop(); - } catch (IOException e) { - LOG.warn("ZK.stop() failed", e); - } - } - } - } - - /** - * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) - */ - private static KafkaServer getKafkaServer(String kafkaHost, String zookeeperConnectionString, int brokerId, File tmpFolder) throws UnknownHostException { - Properties kafkaProperties = new Properties(); - - int kafkaPort = NetUtils.getAvailablePort(); - - // properties have to be Strings - kafkaProperties.put("advertised.host.name", kafkaHost); - kafkaProperties.put("port", Integer.toString(kafkaPort)); - kafkaProperties.put("broker.id", Integer.toString(brokerId)); - kafkaProperties.put("log.dir", tmpFolder.toString()); - kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); - KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); - - KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime()); - server.startup(); - return server; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties index dc20726..9ede613 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger log4j.appender.testlogger=org.apache.log4j.ConsoleAppender log4j.appender.testlogger.target = System.err http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java index 196f7ec..4bd89c4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.checkpoint; +import java.io.Serializable; + /** * This interface marks a function/operator as <i>asynchronously checkpointed</i>. * Similar to the {@link Checkpointed} interface, the function must produce a @@ -32,4 +34,4 @@ package org.apache.flink.streaming.api.checkpoint; * {@link #snapshotState(long, long)} method is typically a copy or shadow copy * of the actual state.</p> */ -public interface CheckpointedAsynchronously extends Checkpointed {} +public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index a59407c..3efad93 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -30,8 +30,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute() throws Exception { - return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(), - getConfig().isSysoutLoggingEnabled()); + return execute(DEFAULT_JOB_NAME); } /** @@ -44,7 +43,9 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute(String jobName) throws Exception { - return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(), - getConfig().isSysoutLoggingEnabled()); + JobExecutionResult result = ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(), + getConfig().isSysoutLoggingEnabled()); + streamGraph.clear(); // clear graph to allow submitting another job via the same environment. + return result; } } http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b0471f9..2e33b82 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -35,6 +35,7 @@ import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.Client.OptimizerPlanEnvironment; @@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction; import org.apache.flink.streaming.api.functions.source.FileSourceFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.GenSequenceFunction; -import org.apache.flink.streaming.api.functions.source.GenericSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; @@ -67,6 +67,8 @@ import com.esotericsoftware.kryo.Serializer; */ public abstract class StreamExecutionEnvironment { + public final static String DEFAULT_JOB_NAME = "Flink Streaming Job"; + private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); private long bufferTimeout = 100; @@ -624,8 +626,8 @@ public abstract class StreamExecutionEnvironment { TypeInformation<OUT> outTypeInfo; - if (function instanceof GenericSourceFunction) { - outTypeInfo = ((GenericSourceFunction<OUT>) function).getType(); + if (function instanceof ResultTypeQueryable) { + outTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType(); } else { try { outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java deleted file mode 100644 index 0113cfe..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.source; - -import org.apache.flink.api.common.typeinfo.TypeInformation; - -public interface GenericSourceFunction<T> { - - TypeInformation<T> getType(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 93bf8eb..271c05c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -60,8 +60,7 @@ import org.slf4j.LoggerFactory; public class StreamGraph extends StreamingPlan { private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); - private final static String DEAFULT_JOB_NAME = "Flink Streaming Job"; - private String jobName = DEAFULT_JOB_NAME; + private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME; private final StreamExecutionEnvironment environemnt; private final ExecutionConfig executionConfig; @@ -70,17 +69,25 @@ public class StreamGraph extends StreamingPlan { private long checkpointingInterval = 5000; private boolean chaining = true; - private final Map<Integer, StreamNode> streamNodes; - private final Set<Integer> sources; + private Map<Integer, StreamNode> streamNodes; + private Set<Integer> sources; - private final Map<Integer, StreamLoop> streamLoops; - protected final Map<Integer, StreamLoop> vertexIDtoLoop; + private Map<Integer, StreamLoop> streamLoops; + protected Map<Integer, StreamLoop> vertexIDtoLoop; public StreamGraph(StreamExecutionEnvironment environment) { this.environemnt = environment; executionConfig = environment.getConfig(); + // create an empty new stream graph. + clear(); + } + + /** + * Remove all registered nodes etc. + */ + public void clear() { streamNodes = new HashMap<Integer, StreamNode>(); streamLoops = new HashMap<Integer, StreamLoop>(); vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>(); http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index f66e394..24a08eb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.tasks.StreamTaskContext; import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,8 +137,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable { callUserFunction(); } catch (Exception e) { if (LOG.isErrorEnabled()) { - LOG.error("Calling user function failed due to: {}", - StringUtils.stringifyException(e)); + LOG.error("Calling user function failed", e); } throw new RuntimeException(e); } @@ -168,7 +166,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable { try { FunctionUtils.closeFunction(userFunction); } catch (Exception e) { - throw new RuntimeException("Error when closing the function: " + e.getMessage()); + throw new RuntimeException("Error when closing the function", e); } } @@ -187,8 +185,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable { public void setChainingStrategy(ChainingStrategy strategy) { if (strategy == ChainingStrategy.ALWAYS) { if (!(this instanceof ChainableStreamOperator)) { - throw new RuntimeException( - "Operator needs to extend ChainableOperator to be chained"); + throw new RuntimeException("Operator needs to extend ChainableOperator to be chained"); } } this.chainingStrategy = strategy; http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java deleted file mode 100644 index 4dd4b45..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.runtime.state.OperatorState; - -/** - * Base class for representing operator states that can be repartitioned for - * state state and load balancing. - * - * @param <T> - * The type of the operator state. - */ -public abstract class PartitionableState<T> extends OperatorState<T> { - - private static final long serialVersionUID = 1L; - - PartitionableState(T initialState) { - super(initialState); - } - - /** - * Repartitions(divides) the current state into the given number of new - * partitions. The created partitions will be used to redistribute then - * rebuild the state among the parallel instances of the operator. The - * implementation should reflect the partitioning of the input values to - * maintain correct operator behavior. - * - * </br> </br> It is also assumed that if we would {@link #reBuild} the - * repartitioned state we would basically get the same as before. - * - * - * @param numberOfPartitions - * The desired number of partitions. The method must return an - * array of that size. - * @return The array containing the state part for each partition. - */ - public abstract OperatorState<T>[] repartition(int numberOfPartitions); - - /** - * Rebuilds the current state partition from the given parts. Used for - * building the state after a re-balance phase. - * - * @param parts - * The state parts that will be used to rebuild the current - * partition. - * @return The rebuilt operator state. - */ - public abstract OperatorState<T> reBuild(OperatorState<T>... parts); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 930c9b4..0259568 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -273,7 +273,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask synchronized (checkpointLock) { if (isRunning) { try { - LOG.info("Starting checkpoint " + checkpointId); + LOG.info("Starting checkpoint {} on task {}", checkpointId, getName()); // first draw the state that should go into checkpoint LocalStateHandle state; @@ -282,7 +282,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask state = userState == null ? null : new LocalStateHandle(userState); } catch (Exception e) { - throw new Exception("Error while drawing snapshot of the user state."); + throw new Exception("Error while drawing snapshot of the user state.", e); } // now emit the checkpoint barriers @@ -333,8 +333,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask triggerCheckpoint(sStep.getId(), sStep.getTimestamp()); } catch (Exception e) { - throw new RuntimeException( - "Error triggering a checkpoint as the result of receiving checkpoint barrier", e); + throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java index faaa79b..87c9757 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java @@ -17,9 +17,11 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + import java.io.Serializable; -public interface DeserializationSchema<T> extends Serializable { +public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { /** * Deserializes the incoming data. http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java index 93d13ab..a4b1419 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.util.serialization; import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> { @@ -38,4 +40,9 @@ public class JavaDefaultStringSchema implements DeserializationSchema<String>, S return SerializationUtils.deserialize(message); } + @Override + public TypeInformation<String> getProducedType() { + return TypeExtractor.getForClass(String.class); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java index e457bef..9c5885f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + public class RawSchema implements DeserializationSchema<byte[]>, SerializationSchema<byte[], byte[]> { @@ -36,4 +39,9 @@ public class RawSchema implements DeserializationSchema<byte[]>, public byte[] serialize(byte[] element) { return element; } + + @Override + public TypeInformation<byte[]> getProducedType() { + return TypeExtractor.getForClass(byte[].class); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java index 3d0a0d5..7c5946d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String, String> { @@ -37,4 +40,8 @@ public class SimpleStringSchema implements DeserializationSchema<String>, return element; } + @Override + public TypeInformation<String> getProducedType() { + return TypeExtractor.getForClass(String.class); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java deleted file mode 100644 index 136a091..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.flink.runtime.state.OperatorState; -import org.apache.flink.runtime.state.StateCheckpoint; -import org.junit.Test; - -public class OperatorStateTest { - - @Test - public void testOperatorState() { - OperatorState<Integer> os = new OperatorState<Integer>(5); - - StateCheckpoint<Integer> scp = os.checkpoint(); - - assertTrue(os.stateEquals(scp.restore())); - - assertEquals((Integer) 5, os.getState()); - - os.update(10); - - assertEquals((Integer) 10, os.getState()); - } - -}