Repository: flink Updated Branches: refs/heads/master 8ccd7544e -> 6968a57a1
[FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9 This closes #1597 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9173825a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9173825a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9173825a Branch: refs/heads/master Commit: 9173825aa6a1525d72a78cda16cb4ae1e9b8a8e4 Parents: 8ccd754 Author: Robert Metzger <rmetz...@apache.org> Authored: Sat Feb 6 13:27:06 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 10 15:12:34 2016 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer08.java | 2 +- .../kafka/internals/LegacyFetcher.java | 3 ++- .../connectors/kafka/Kafka08ITCase.java | 22 ++++++++++---------- .../kafka/KafkaTestEnvironmentImpl.java | 20 +++--------------- .../kafka/KafkaTestEnvironmentImpl.java | 22 +++++--------------- .../connectors/kafka/KafkaConsumerTestBase.java | 5 ++--- .../connectors/kafka/KafkaTestBase.java | 2 -- .../connectors/kafka/KafkaTestEnvironment.java | 3 --- .../flink/yarn/YARNSessionFIFOITCase.java | 1 + 9 files changed, 25 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index bdea37f..1cdfffe 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -70,7 +70,7 @@ import static com.google.common.base.Preconditions.checkNotNull; * <li>socket.timeout.ms</li> * <li>socket.receive.buffer.bytes</li> * <li>fetch.message.max.bytes</li> - * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li> + * <li>auto.offset.reset with the values "largest", "smallest"</li> * <li>fetch.wait.max.ms</li> * </ul> * </li> http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index fe7f777..10f4c41 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -576,7 +576,8 @@ public class LegacyFetcher implements Fetcher { private static long getInvalidOffsetBehavior(Properties config) { long timeType; - if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) { + String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); + if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 timeType = OffsetRequest.LatestTime(); } else { timeType = OffsetRequest.EarliestTime(); http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 6a2fa27..a3e815e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -93,13 +93,13 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // set invalid offset: CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topic, 0, 1234); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234); curatorClient.close(); // read from topic final int valuesCount = 20; final int startFrom = 0; - readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom); + readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom); deleteTestTopic(topic); } @@ -188,9 +188,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 0); - long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 1); - long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 2); + long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0); + long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1); + long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2); LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); @@ -201,9 +201,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { LOG.info("Manipulating offsets"); // set the offset to 50 for the three partitions - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 0, 49); - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 1, 49); - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 2, 49); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0, 49); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1, 49); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2, 49); curatorClient.close(); @@ -250,9 +250,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // get the offset CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 0); - long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 1); - long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 2); + long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); + long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); + long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 348b75d..6f56ede 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka; import kafka.admin.AdminUtils; import kafka.api.PartitionMetadata; import kafka.common.KafkaException; -import kafka.consumer.ConsumerConfig; import kafka.network.SocketServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; @@ -68,19 +67,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private ConsumerConfig standardCC; - public String getBrokerConnectionString() { return brokerConnectionString; } - - @Override - public ConsumerConfig getStandardConsumerConfig() { - return standardCC; - } - @Override public Properties getStandardProperties() { return standardProps; @@ -187,13 +178,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("auto.commit.enable", "false"); standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); - standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. + standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) - - Properties consumerConfigProps = new Properties(); - consumerConfigProps.putAll(standardProps); - consumerConfigProps.setProperty("auto.offset.reset", "smallest"); - standardCC = new ConsumerConfig(consumerConfigProps); } @Override @@ -274,8 +260,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } private ZkClient createZkClient() { - return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), - standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 0855ba6..50dcab8 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -65,19 +65,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private ConsumerConfig standardCC; - public String getBrokerConnectionString() { return brokerConnectionString; } @Override - public ConsumerConfig getStandardConsumerConfig() { - return standardCC; - } - - @Override public Properties getStandardProperties() { return standardProps; } @@ -184,13 +177,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("auto.commit.enable", "false"); standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); - standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) - - Properties consumerConfigProps = new Properties(); - consumerConfigProps.putAll(standardProps); - consumerConfigProps.setProperty("auto.offset.reset", "smallest"); - standardCC = new ConsumerConfig(consumerConfigProps); } @Override @@ -233,8 +221,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } public ZkUtils getZkUtils() { - ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), - standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); return ZkUtils.apply(creator, false); } @@ -280,8 +268,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { try { LOG.info("Deleting topic {}", topic); - ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), - standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); AdminUtils.deleteTopic(zkUtils, topic); http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 2d9f2fc..680e4ec 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1284,11 +1284,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { throws IOException { // write the sequence to log for debugging purposes - Properties stdProps = standardCC.props().props(); - Properties newProps = new Properties(stdProps); + Properties newProps = new Properties(standardProps); newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString()); newProps.setProperty("auto.offset.reset", "smallest"); - newProps.setProperty("zookeeper.connect", standardCC.zkConnect()); + newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect")); ConsumerConfig printerConfig = new ConsumerConfig(newProps); printTopic(topicName, printerConfig, deserializer, elements); http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 73cd2f9..ab1d5b6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -66,7 +66,6 @@ public abstract class KafkaTestBase extends TestLogger { protected static String brokerConnectionStrings; - protected static ConsumerConfig standardCC; protected static Properties standardProps; protected static ForkableFlinkMiniCluster flink; @@ -98,7 +97,6 @@ public abstract class KafkaTestBase extends TestLogger { kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS); standardProps = kafkaServer.getStandardProperties(); - standardCC = kafkaServer.getStandardConsumerConfig(); brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); // start also a re-usable Flink mini cluster http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 40be8a1..76a284b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.kafka; -import kafka.consumer.ConsumerConfig; import kafka.server.KafkaServer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.DeserializationSchema; @@ -25,7 +24,6 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import java.net.UnknownHostException; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -45,7 +43,6 @@ public abstract class KafkaTestEnvironment { public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor); - public abstract ConsumerConfig getStandardConsumerConfig(); public abstract Properties getStandardProperties(); http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 8c9a9c7..98dc85f 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -155,6 +155,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); Assert.assertEquals(1, apps.size()); // Only one running ApplicationReport app = apps.get(0); + Assert.assertEquals("MyCustomName", app.getName()); ApplicationId id = app.getApplicationId(); yc.killApplication(id);