Repository: flink
Updated Branches:
  refs/heads/master 8ccd7544e -> 6968a57a1


[FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9

This closes #1597


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9173825a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9173825a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9173825a

Branch: refs/heads/master
Commit: 9173825aa6a1525d72a78cda16cb4ae1e9b8a8e4
Parents: 8ccd754
Author: Robert Metzger <rmetz...@apache.org>
Authored: Sat Feb 6 13:27:06 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 10 15:12:34 2016 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  2 +-
 .../kafka/internals/LegacyFetcher.java          |  3 ++-
 .../connectors/kafka/Kafka08ITCase.java         | 22 ++++++++++----------
 .../kafka/KafkaTestEnvironmentImpl.java         | 20 +++---------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 22 +++++---------------
 .../connectors/kafka/KafkaConsumerTestBase.java |  5 ++---
 .../connectors/kafka/KafkaTestBase.java         |  2 --
 .../connectors/kafka/KafkaTestEnvironment.java  |  3 ---
 .../flink/yarn/YARNSessionFIFOITCase.java       |  1 +
 9 files changed, 25 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index bdea37f..1cdfffe 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -70,7 +70,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  *             <li>socket.timeout.ms</li>
  *             <li>socket.receive.buffer.bytes</li>
  *             <li>fetch.message.max.bytes</li>
- *             <li>auto.offset.reset with the values "latest", "earliest" 
(unlike 0.8.2 behavior)</li>
+ *             <li>auto.offset.reset with the values "largest", "smallest"</li>
  *             <li>fetch.wait.max.ms</li>
  *         </ul>
  *     </li>

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index fe7f777..10f4c41 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -576,7 +576,8 @@ public class LegacyFetcher implements Fetcher {
 
                private static long getInvalidOffsetBehavior(Properties config) 
{
                        long timeType;
-                       if 
(config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest").equals("latest")) {
+                       String val = 
config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
+                       if (val.equals("largest") || val.equals("latest")) { // 
largest is kafka 0.8, latest is kafka 0.9
                                timeType = OffsetRequest.LatestTime();
                        } else {
                                timeType = OffsetRequest.EarliestTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 6a2fa27..a3e815e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -93,13 +93,13 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
                // set invalid offset:
                CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topic, 0, 1234);
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topic, 0, 1234);
                curatorClient.close();
 
                // read from topic
                final int valuesCount = 20;
                final int startFrom = 0;
-               readSequence(env, standardCC.props().props(), parallelism, 
topic, valuesCount, startFrom);
+               readSequence(env, standardProps, parallelism, topic, 
valuesCount, startFrom);
 
                deleteTestTopic(topic);
        }
@@ -188,9 +188,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
                CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 2);
+               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 0);
+               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 1);
+               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 2);
 
                LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 
@@ -201,9 +201,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                LOG.info("Manipulating offsets");
 
                // set the offset to 50 for the three partitions
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 0, 49);
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 1, 49);
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 2, 49);
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 0, 49);
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 1, 49);
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 2, 49);
 
                curatorClient.close();
 
@@ -250,9 +250,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                // get the offset
                CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardCC.groupId(), topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardCC.groupId(), topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardCC.groupId(), topicName, 2);
+               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
+               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
+               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
 
                LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 348b75d..6f56ede 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.common.KafkaException;
-import kafka.consumer.ConsumerConfig;
 import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -68,19 +67,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String zookeeperConnectionString;
        private String brokerConnectionString = "";
        private Properties standardProps;
-       private ConsumerConfig standardCC;
-
 
        public String getBrokerConnectionString() {
                return brokerConnectionString;
        }
 
-
-       @Override
-       public ConsumerConfig getStandardConsumerConfig() {
-               return standardCC;
-       }
-
        @Override
        public Properties getStandardProperties() {
                return standardProps;
@@ -187,13 +178,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("auto.commit.enable", "false");
                standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
                standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
-               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning.
+               standardProps.setProperty("auto.offset.reset", "smallest"); // 
read from the beginning. (smallest is kafka 0.8)
                standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
-               Properties consumerConfigProps = new Properties();
-               consumerConfigProps.putAll(standardProps);
-               consumerConfigProps.setProperty("auto.offset.reset", 
"smallest");
-               standardCC = new ConsumerConfig(consumerConfigProps);
        }
 
        @Override
@@ -274,8 +260,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        private ZkClient createZkClient() {
-               return new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
-                               standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
+               return new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0855ba6..50dcab8 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -65,19 +65,12 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String zookeeperConnectionString;
        private String brokerConnectionString = "";
        private Properties standardProps;
-       private ConsumerConfig standardCC;
-
 
        public String getBrokerConnectionString() {
                return brokerConnectionString;
        }
 
        @Override
-       public ConsumerConfig getStandardConsumerConfig() {
-               return standardCC;
-       }
-
-       @Override
        public Properties getStandardProperties() {
                return standardProps;
        }
@@ -184,13 +177,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("auto.commit.enable", "false");
                standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
                standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
-               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning.
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.9 value)
                standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
-               Properties consumerConfigProps = new Properties();
-               consumerConfigProps.putAll(standardProps);
-               consumerConfigProps.setProperty("auto.offset.reset", 
"smallest");
-               standardCC = new ConsumerConfig(consumerConfigProps);
        }
 
        @Override
@@ -233,8 +221,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        public ZkUtils getZkUtils() {
-               ZkClient creator = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
-                               standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
+               ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
                return ZkUtils.apply(creator, false);
        }
 
@@ -280,8 +268,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                try {
                        LOG.info("Deleting topic {}", topic);
 
-                       ZkClient zk = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
-                                       standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
+                       ZkClient zk = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
 
                        AdminUtils.deleteTopic(zkUtils, topic);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2d9f2fc..680e4ec 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1284,11 +1284,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        throws IOException
        {
                // write the sequence to log for debugging purposes
-               Properties stdProps = standardCC.props().props();
-               Properties newProps = new Properties(stdProps);
+               Properties newProps = new Properties(standardProps);
                newProps.setProperty("group.id", "topic-printer"+ 
UUID.randomUUID().toString());
                newProps.setProperty("auto.offset.reset", "smallest");
-               newProps.setProperty("zookeeper.connect", 
standardCC.zkConnect());
+               newProps.setProperty("zookeeper.connect", 
standardProps.getProperty("zookeeper.connect"));
 
                ConsumerConfig printerConfig = new ConsumerConfig(newProps);
                printTopic(topicName, printerConfig, deserializer, elements);

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 73cd2f9..ab1d5b6 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -66,7 +66,6 @@ public abstract class KafkaTestBase extends TestLogger {
 
        protected static String brokerConnectionStrings;
 
-       protected static ConsumerConfig standardCC;
        protected static Properties standardProps;
        
        protected static ForkableFlinkMiniCluster flink;
@@ -98,7 +97,6 @@ public abstract class KafkaTestBase extends TestLogger {
                kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
 
                standardProps = kafkaServer.getStandardProperties();
-               standardCC = kafkaServer.getStandardConsumerConfig();
                brokerConnectionStrings = 
kafkaServer.getBrokerConnectionString();
 
                // start also a re-usable Flink mini cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 40be8a1..76a284b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import kafka.consumer.ConsumerConfig;
 import kafka.server.KafkaServer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
@@ -25,7 +24,6 @@ import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
-import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -45,7 +43,6 @@ public abstract class KafkaTestEnvironment {
 
        public abstract void createTestTopic(String topic, int 
numberOfPartitions, int replicationFactor);
 
-       public abstract ConsumerConfig getStandardConsumerConfig();
 
        public abstract Properties getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 8c9a9c7..98dc85f 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -155,6 +155,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                        List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
                        Assert.assertEquals(1, apps.size()); // Only one running
                        ApplicationReport app = apps.get(0);
+
                        Assert.assertEquals("MyCustomName", app.getName());
                        ApplicationId id = app.getApplicationId();
                        yc.killApplication(id);

Reply via email to