http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 0dbe865..213ba4a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -65,6 +65,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String brokerConnectionString = ""; private Properties standardProps; private Properties additionalServerProperties; + private boolean secureMode = false; + // 6 seconds is default. Seems to be too small for travis. 30 seconds + private String zkTimeout = "30000"; public String getBrokerConnectionString() { return brokerConnectionString; @@ -131,8 +134,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties) { + public boolean isSecureRunSupported() { + return true; + } + + @Override + public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if(secureMode) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + numKafkaServers = 1; + zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15); + } + this.additionalServerProperties = additionalServerProperties; + this.secureMode = secureMode; File tempDir = new File(System.getProperty("java.io.tmpdir")); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); @@ -155,6 +172,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { LOG.info("Starting Zookeeper"); zookeeper = new TestingServer(-1, tmpZkDir); zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting KafkaServer"); brokers = new ArrayList<>(numKafkaServers); @@ -163,7 +181,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); SocketServer socketServer = brokers.get(i).socketServer(); - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + if(secureMode) { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; + } else { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + } } LOG.info("ZK and KafkaServer started."); @@ -173,15 +195,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { fail("Test setup failed: " + t.getMessage()); } + LOG.info("brokerConnectionString --> {}", brokerConnectionString); + standardProps = new Properties(); standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("auto.commit.enable", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. - standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); + standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout); + standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout); standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } @Override @@ -196,6 +221,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { if (zookeeper != null) { try { zookeeper.stop(); + zookeeper.close(); } catch (Exception e) { LOG.warn("ZK.stop() failed", e); @@ -224,6 +250,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } public ZkUtils getZkUtils() { + LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString); ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); return ZkUtils.apply(creator, false); @@ -241,23 +268,37 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { zkUtils.close(); } + LOG.info("Topic {} create request is successfully posted", topic); + // validate that the topic has been created - final long deadline = System.currentTimeMillis() + 30000; + final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout); do { try { - Thread.sleep(100); + if(secureMode) { + //increase wait time since in Travis ZK timeout occurs frequently + int wait = Integer.parseInt(zkTimeout) / 100; + LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); + Thread.sleep(wait); + } else { + Thread.sleep(100); + } + } catch (InterruptedException e) { // restore interrupted state } // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are // not always correct. + LOG.info("Validating if the topic {} has been created or not", topic); + // create a new ZK utils connection ZkUtils checkZKConn = getZkUtils(); if(AdminUtils.topicExists(checkZKConn, topic)) { + LOG.info("topic {} has been created successfully", topic); checkZKConn.close(); return; } + LOG.info("topic {} has not been created yet. Will check again...", topic); checkZKConn.close(); } while (System.currentTimeMillis() < deadline); @@ -296,8 +337,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); - kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); + kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); + kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); if(additionalServerProperties != null) { kafkaProperties.putAll(additionalServerProperties); } @@ -307,6 +348,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { for (int i = 1; i <= numTries; i++) { int kafkaPort = NetUtils.getAvailablePort(); kafkaProperties.put("port", Integer.toString(kafkaPort)); + + //to support secure kafka cluster + if(secureMode) { + LOG.info("Adding Kafka secure configurations"); + kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.putAll(getSecureProperties()); + } + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); try { @@ -329,4 +379,19 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); } + public Properties getSecureProperties() { + Properties prop = new Properties(); + if(secureMode) { + prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + prop.put("security.protocol", "SASL_PLAINTEXT"); + prop.put("sasl.kerberos.service.name", "kafka"); + + //add special timeout for Travis + prop.setProperty("zookeeper.session.timeout.ms", zkTimeout); + prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout); + prop.setProperty("metadata.fetch.timeout.ms","120000"); + } + return prop; + } + }
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties index fbeb110..4ac1773 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties @@ -28,3 +28,5 @@ log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger log4j.logger.org.apache.zookeeper=OFF, testlogger log4j.logger.state.change.logger=OFF, testlogger log4j.logger.kafka=OFF, testlogger + +log4j.logger.org.apache.directory=OFF, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml index 49d630f..ef71bde 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml @@ -161,6 +161,14 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <dependencyManagement> @@ -187,6 +195,17 @@ under the License. </execution> </executions> </plugin> + <!-- + https://issues.apache.org/jira/browse/DIRSHARED-134 + Required to pull the Mini-KDC transitive dependency + --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.1</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 920f15b..a87ff8a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -181,6 +181,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { properties.setProperty("session.timeout.ms", "2000"); properties.setProperty("fetch.max.wait.ms", "2000"); properties.setProperty("heartbeat.interval.ms", "1000"); + properties.putAll(secureProps); FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties); DataStream<String> stream = see.addSource(source); stream.print(); @@ -275,6 +276,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { }); Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); + producerProperties.putAll(secureProps); FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null); stream.addSink(prod); @@ -283,7 +285,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { List<String> topics = new ArrayList<>(); topics.add(topic); topics.add(additionalEmptyTopic); - FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, standardProps); + + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props); DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism); @@ -371,7 +377,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); env .addSource(kafkaSource) @@ -416,7 +426,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); env .addSource(kafkaSource) @@ -463,7 +476,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.getConfig().disableSysoutLogging(); env.setBufferTimeout(0); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); env .addSource(kafkaSource) @@ -506,7 +522,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.enableCheckpointing(100); env.getConfig().disableSysoutLogging(); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); env.addSource(source).addSink(new DiscardingSink<String>()); @@ -577,7 +596,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.enableCheckpointing(100); env.getConfig().disableSysoutLogging(); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); env.addSource(source).addSink(new DiscardingSink<String>()); @@ -629,7 +651,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.setParallelism(12); // needs to be more that the mini cluster has slots env.getConfig().disableSysoutLogging(); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); env .addSource(kafkaSource) @@ -700,15 +725,19 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); - stream.addSink(kafkaServer.getProducer("dummy", schema, standardProps, null)); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + + stream.addSink(kafkaServer.getProducer("dummy", schema, props, null)); env.execute("Write to topics"); // run second job consuming from multiple topics env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.getConfig().disableSysoutLogging(); - - stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps)); + + stream = env.addSource(kafkaServer.getConsumer(topics, schema, props)); stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS); @@ -787,6 +816,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // Produce serialized JSON data createTestTopic(topic, 1, 1); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + StreamExecutionEnvironment env = StreamExecutionEnvironment .createRemoteEnvironment("localhost", flinkPort); env.getConfig().disableSysoutLogging(); @@ -805,7 +838,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { }).addSink(kafkaServer.getProducer( topic, new ByteArraySerializationSchema(), - standardProps, + props, null)); // Execute blocks @@ -940,6 +973,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14)); consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher consumerProps.setProperty("queued.max.message.chunks", "1"); + consumerProps.putAll(secureProps); FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps); DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source); @@ -969,6 +1003,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Properties producerProps = new Properties(); producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15)); producerProps.setProperty("retries", "3"); + producerProps.putAll(secureProps); producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings); DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() { @@ -1047,8 +1082,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); - - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); env .addSource(kafkaSource) @@ -1097,6 +1134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); + producerProperties.putAll(secureProps); kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null)); env.execute("Write KV to Kafka"); @@ -1110,7 +1148,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); - DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps)); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props)); fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() { long counter = 0; @Override @@ -1178,6 +1219,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); + producerProperties.putAll(secureProps); + kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null)); env.execute("Write deletes to Kafka"); @@ -1189,7 +1232,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); - DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props)); fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() { long counter = 0; @@ -1226,7 +1272,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env1.getConfig().disableSysoutLogging(); - DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps)); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + + DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props)); fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() { @Override public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception { @@ -1262,8 +1312,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env1.getConfig().disableSysoutLogging(); env1.disableOperatorChaining(); // let the source read everything into the network buffers + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig()); - DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props)); fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { @Override public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op @@ -1288,7 +1342,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } }); - fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), props, null)); env1.execute("Metrics test job"); } catch(Throwable t) { @@ -1403,6 +1457,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser = new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); + cc.putAll(secureProps); // create the consumer FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); @@ -1505,6 +1560,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // the producer must not produce duplicates Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "0"); + producerProperties.putAll(secureProps); stream.addSink(kafkaServer.getProducer( topicName, serSchema, producerProperties, @@ -1537,7 +1593,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Properties readProps = (Properties) standardProps.clone(); readProps.setProperty("group.id", "flink-tests-validator"); - + readProps.putAll(secureProps); FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps); readEnv @@ -1672,6 +1728,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString()); newProps.setProperty("auto.offset.reset", "smallest"); newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect")); + newProps.putAll(secureProps); ConsumerConfig printerConfig = new ConsumerConfig(newProps); printTopic(topicName, printerConfig, deserializer, elements); @@ -1893,8 +1950,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema = new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig()); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer - .getConsumer(topics, sourceSchema, standardProps) + .getConsumer(topics, sourceSchema, props) .assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor()); DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source); http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 14e74f1..5bcf406 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -34,6 +34,7 @@ import org.apache.flink.test.util.SuccessException; import java.io.Serializable; +import java.util.Properties; import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; @@ -102,17 +103,24 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { } }) .setParallelism(1); + + Properties props = new Properties(); + props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings)); + props.putAll(secureProps); // sink partitions into stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(serSchema), - FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), + props, new CustomPartitioner(parallelism))) .setParallelism(parallelism); // ------ consuming topology --------- - - FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, standardProps); + + Properties consumerProps = new Properties(); + consumerProps.putAll(standardProps); + consumerProps.putAll(secureProps); + FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps); env.addSource(source).setParallelism(parallelism) http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index c4949ff..9236e78 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -36,6 +36,8 @@ import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,29 +62,39 @@ public class KafkaShortRetentionTestBase implements Serializable { private static Properties standardProps; private static LocalFlinkMiniCluster flink; + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static Properties secureProps = new Properties(); + @BeforeClass public static void prepare() throws IOException, ClassNotFoundException { LOG.info("-------------------------------------------------------------------------"); LOG.info(" Starting KafkaShortRetentionTestBase "); LOG.info("-------------------------------------------------------------------------"); + Configuration flinkConfig = new Configuration(); + // dynamically load the implementation for the test Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + if(kafkaServer.isSecureRunSupported()) { + secureProps = kafkaServer.getSecureProperties(); + } + Properties specificProperties = new Properties(); specificProperties.setProperty("log.retention.hours", "0"); specificProperties.setProperty("log.retention.minutes", "0"); specificProperties.setProperty("log.retention.ms", "250"); specificProperties.setProperty("log.retention.check.interval.ms", "100"); - kafkaServer.prepare(1, specificProperties); + kafkaServer.prepare(1, specificProperties, false); standardProps = kafkaServer.getStandardProperties(); // start also a re-usable Flink mini cluster - Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); @@ -98,6 +110,8 @@ public class KafkaShortRetentionTestBase implements Serializable { flink.shutdown(); } kafkaServer.shutdown(); + + secureProps.clear(); } /** @@ -151,12 +165,17 @@ public class KafkaShortRetentionTestBase implements Serializable { running = false; } }); - stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), standardProps, null)); + + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + + stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null)); // ----------- add consumer dataflow ---------- NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema(); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, standardProps); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props); DataStreamSource<String> consuming = env.addSource(source); consuming.addSink(new DiscardingSink<String>()); @@ -224,6 +243,7 @@ public class KafkaShortRetentionTestBase implements Serializable { Properties customProps = new Properties(); customProps.putAll(standardProps); + customProps.putAll(secureProps); customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps); @@ -255,6 +275,7 @@ public class KafkaShortRetentionTestBase implements Serializable { Properties customProps = new Properties(); customProps.putAll(standardProps); + customProps.putAll(secureProps); customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception try { http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 771db17..afdd158 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -31,6 +31,8 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,61 +75,90 @@ public abstract class KafkaTestBase extends TestLogger { protected static KafkaTestEnvironment kafkaServer; + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static Properties secureProps = new Properties(); + // ------------------------------------------------------------------------ // Setup and teardown of the mini clusters // ------------------------------------------------------------------------ @BeforeClass public static void prepare() throws IOException, ClassNotFoundException { + LOG.info("-------------------------------------------------------------------------"); LOG.info(" Starting KafkaTestBase "); LOG.info("-------------------------------------------------------------------------"); - + startClusters(false); - // dynamically load the implementation for the test - Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); - kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + } - LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + @AfterClass + public static void shutDownServices() { - kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS); + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Shut down KafkaTestBase "); + LOG.info("-------------------------------------------------------------------------"); - standardProps = kafkaServer.getStandardProperties(); - brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); + shutdownClusters(); - // start also a re-usable Flink mini cluster - Configuration flinkConfig = new Configuration(); + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" KafkaTestBase finished"); + LOG.info("-------------------------------------------------------------------------"); + } + + protected static Configuration getFlinkConfiguration() { + Configuration flinkConfig = new Configuration();; flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + return flinkConfig; + } + + protected static void startClusters(boolean secureMode) throws ClassNotFoundException { + + // dynamically load the implementation for the test + Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); + kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + + LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + + kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode); + + standardProps = kafkaServer.getStandardProperties(); - flink = new LocalFlinkMiniCluster(flinkConfig, false); + brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); + + if(kafkaServer.isSecureRunSupported() && secureMode) { + secureProps = kafkaServer.getSecureProperties(); + } + + // start also a re-usable Flink mini cluster + flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); flink.start(); flinkPort = flink.getLeaderRPCPort(); - } - @AfterClass - public static void shutDownServices() { + } - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Shut down KafkaTestBase "); - LOG.info("-------------------------------------------------------------------------"); + protected static void shutdownClusters() { flinkPort = -1; if (flink != null) { flink.shutdown(); } + if(secureProps != null) { + secureProps.clear(); + } + kafkaServer.shutdown(); - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" KafkaTestBase finished"); - LOG.info("-------------------------------------------------------------------------"); } @@ -164,4 +195,5 @@ public abstract class KafkaTestBase extends TestLogger { protected static void deleteTestTopic(String topic) { kafkaServer.deleteTestTopic(topic); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 0b1d51d..6ecde71 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -35,10 +35,10 @@ public abstract class KafkaTestEnvironment { protected static final String KAFKA_HOST = "localhost"; - public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties); + public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode); - public void prepare(int numberOfKafkaServers) { - this.prepare(numberOfKafkaServers, null); + public void prepare(int numberOfKafkaServers, boolean secureMode) { + this.prepare(numberOfKafkaServers, null, secureMode); } public abstract void shutdown(); @@ -51,9 +51,10 @@ public abstract class KafkaTestEnvironment { this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties()); } - public abstract Properties getStandardProperties(); + public abstract Properties getSecureProperties(); + public abstract String getBrokerConnectionString(); public abstract String getVersion(); @@ -86,4 +87,6 @@ public abstract class KafkaTestEnvironment { public abstract int getBrokerId(KafkaServer server); + public abstract boolean isSecureRunSupported(); + } http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index 5a38e56..58a5cc3 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -135,11 +135,18 @@ public class DataGenerators { } }); + Properties props = new Properties(); + props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString())); + Properties secureProps = testServer.getSecureProperties(); + if(secureProps != null) { + props.putAll(testServer.getSecureProperties()); + } + stream .rebalance() .addSink(testServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), - FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()), + props, new KafkaPartitioner<Integer>() { @Override public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/pom.xml ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index 18ecfde..5c99ef6 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -78,5 +78,30 @@ under the License. <scope>compile</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + + <!-- + https://issues.apache.org/jira/browse/DIRSHARED-134 + Required to pull the Mini-KDC transitive dependency + --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.1</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java index a478908..6ec6c2c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java @@ -25,6 +25,8 @@ import org.apache.flink.test.util.TestBaseUtils; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base class for streaming unit tests that run multiple tests and want to reuse the same @@ -67,18 +69,22 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase { super(new Configuration()); } + protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class); + // ------------------------------------------------------------------------ // Cluster setup & teardown // ------------------------------------------------------------------------ @BeforeClass public static void setup() throws Exception { + LOG.info("In StreamingMultipleProgramsTestBase: Starting FlinkMiniCluster "); cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true); TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); } @AfterClass public static void teardown() throws Exception { + LOG.info("In StreamingMultipleProgramsTestBase: Closing FlinkMiniCluster "); TestStreamEnvironment.unsetAsContext(); stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); } http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java new file mode 100644 index 0000000..00b19f1 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.hadoop.minikdc.MiniKdc; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileWriter; +import java.io.BufferedWriter; +import java.io.PrintWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Helper {@link SecureTestEnvironment} to handle MiniKDC lifecycle. + * This class can be used to start/stop MiniKDC and create secure configurations for MiniDFSCluster + * and MiniYarn + */ + +public class SecureTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(SecureTestEnvironment.class); + + private static MiniKdc kdc; + + private static String testKeytab = null; + + private static String testPrincipal = null; + + private static String testZkServerPrincipal = null; + + private static String testZkClientPrincipal = null; + + private static String testKafkaServerPrincipal = null; + + private static String hadoopServicePrincipal = null; + + private static File baseDirForSecureRun = null; + + public static void prepare(TemporaryFolder tempFolder) { + + try { + baseDirForSecureRun = tempFolder.newFolder(); + LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun); + + String hostName = "localhost"; + Properties kdcConf = MiniKdc.createConf(); + if(LOG.isDebugEnabled()) { + kdcConf.setProperty(MiniKdc.DEBUG, "true"); + } + kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName); + kdc = new MiniKdc(kdcConf, baseDirForSecureRun); + kdc.start(); + LOG.info("Started Mini KDC"); + + File keytabFile = new File(baseDirForSecureRun, "test-users.keytab"); + testKeytab = keytabFile.getAbsolutePath(); + testZkServerPrincipal = "zookeeper/127.0.0.1"; + testZkClientPrincipal = "zk-client/127.0.0.1"; + testKafkaServerPrincipal = "kafka/" + hostName; + hadoopServicePrincipal = "hadoop/" + hostName; + testPrincipal = "client/" + hostName; + + kdc.createPrincipal(keytabFile, testPrincipal, testZkServerPrincipal, + hadoopServicePrincipal, + testZkClientPrincipal, + testKafkaServerPrincipal); + + testPrincipal = testPrincipal + "@" + kdc.getRealm(); + testZkServerPrincipal = testZkServerPrincipal + "@" + kdc.getRealm(); + testZkClientPrincipal = testZkClientPrincipal + "@" + kdc.getRealm(); + testKafkaServerPrincipal = testKafkaServerPrincipal + "@" + kdc.getRealm(); + hadoopServicePrincipal = hadoopServicePrincipal + "@" + kdc.getRealm(); + + LOG.info("-------------------------------------------------------------------"); + LOG.info("Test Principal: {}", testPrincipal); + LOG.info("Test ZK Server Principal: {}", testZkServerPrincipal); + LOG.info("Test ZK Client Principal: {}", testZkClientPrincipal); + LOG.info("Test Kafka Server Principal: {}", testKafkaServerPrincipal); + LOG.info("Test Hadoop Service Principal: {}", hadoopServicePrincipal); + LOG.info("Test Keytab: {}", testKeytab); + LOG.info("-------------------------------------------------------------------"); + + //Security Context is established to allow non hadoop applications that requires JAAS + //based SASL/Kerberos authentication to work. However, for Hadoop specific applications + //the context can be reinitialized with Hadoop configuration by calling + //ctx.setHadoopConfiguration() for the UGI implementation to work properly. + //See Yarn test case module for reference + createJaasConfig(baseDirForSecureRun); + SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration(); + Configuration flinkConfig = new Configuration(); + flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab); + flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal); + flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false); + flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath()); + ctx.setFlinkConfiguration(flinkConfig); + TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap()); + + populateSystemEnvVariables(); + + } catch(Exception e) { + LOG.error("Exception occured while preparing secure environment. Reason: {}", e); + throw new RuntimeException(e); + } + + } + + public static void cleanup() { + + LOG.info("Cleaning up Secure Environment"); + + if( kdc != null) { + kdc.stop(); + LOG.info("Stopped KDC server"); + } + + resetSystemEnvVariables(); + + testKeytab = null; + testPrincipal = null; + testZkServerPrincipal = null; + hadoopServicePrincipal = null; + baseDirForSecureRun = null; + + } + + private static void populateSystemEnvVariables() { + + if(LOG.isDebugEnabled()) { + System.setProperty("FLINK_JAAS_DEBUG", "true"); + System.setProperty("sun.security.krb5.debug", "true"); + } + + System.setProperty("java.security.krb5.conf", kdc.getKrb5conf().getAbsolutePath()); + + System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); + System.setProperty("zookeeper.kerberos.removeHostFromPrincipal", "true"); + System.setProperty("zookeeper.kerberos.removeRealmFromPrincipal", "true"); + } + + private static void resetSystemEnvVariables() { + System.clearProperty("java.security.krb5.conf"); + System.clearProperty("FLINK_JAAS_DEBUG"); + System.clearProperty("sun.security.krb5.debug"); + + System.clearProperty("zookeeper.authProvider.1"); + System.clearProperty("zookeeper.kerberos.removeHostFromPrincipal"); + System.clearProperty("zookeeper.kerberos.removeRealmFromPrincipal"); + } + + public static org.apache.flink.configuration.Configuration populateFlinkSecureConfigurations( + @Nullable org.apache.flink.configuration.Configuration flinkConf) { + + org.apache.flink.configuration.Configuration conf; + + if(flinkConf== null) { + conf = new org.apache.flink.configuration.Configuration(); + } else { + conf = flinkConf; + } + + conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab); + conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal); + + return conf; + } + + public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap() { + + Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap = new HashMap<>(); + + if(testZkServerPrincipal != null ) { + TestingSecurityContext.ClientSecurityConfiguration zkServer = + new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab, + "Server", "zk-server"); + clientSecurityConfigurationMap.put("Server",zkServer); + } + + if(testZkClientPrincipal != null ) { + TestingSecurityContext.ClientSecurityConfiguration zkClient = + new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab, + "Client", "zk-client"); + clientSecurityConfigurationMap.put("Client",zkClient); + } + + if(testKafkaServerPrincipal != null ) { + TestingSecurityContext.ClientSecurityConfiguration kafkaServer = + new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab, + "KafkaServer", "kafka-server"); + clientSecurityConfigurationMap.put("KafkaServer",kafkaServer); + } + + return clientSecurityConfigurationMap; + } + + public static String getTestKeytab() { + return testKeytab; + } + + public static String getHadoopServicePrincipal() { + return hadoopServicePrincipal; + } + + /* + * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL + * implementation lookup java.security.auth.login.config + */ + private static void createJaasConfig(File baseDirForSecureRun) { + + try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun,SecurityContext.JAAS_CONF_FILENAME), true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) + { + out.println("sample {"); + out.println("useKeyTab=false"); + out.println("useTicketCache=true;"); + out.println("};"); + } catch (IOException e) { + LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage()); + throw new RuntimeException(e); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java new file mode 100644 index 0000000..25b2362 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.JaasConfiguration; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.util.HashMap; +import java.util.Map; + +/** + * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage + * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated + * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module + * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier + */ + +@Internal +public class TestingJaasConfiguration extends JaasConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class); + + public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap; + + TestingJaasConfiguration(String keytab, String principal, Map<String, + TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) { + super(keytab, principal); + this.clientSecurityConfigurationMap = clientSecurityConfigurationMap; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) { + + LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName); + + AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName); + + if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) { + + if(clientSecurityConfigurationMap.containsKey(applicationName)) { + + LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName); + + TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName); + + if(appConfigurationEntry != null && appConfigurationEntry.length > 0) { + + for(int count=0; count < appConfigurationEntry.length; count++) { + + AppConfigurationEntry ace = appConfigurationEntry[count]; + + if (ace.getOptions().containsKey("keyTab")) { + + String keyTab = conf.getKeytab(); + String principal = conf.getPrincipal(); + + LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " + + "use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal); + + Map<String, String> newKeytabKerberosOptions = new HashMap<>(); + newKeytabKerberosOptions.putAll(getKeytabKerberosOptions()); + + newKeytabKerberosOptions.put("keyTab", keyTab); + newKeytabKerberosOptions.put("principal", principal); + + AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + newKeytabKerberosOptions); + appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce}; + + LOG.debug("---->Login Module is using Keytab based configuration<------"); + LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName()); + LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag()); + LOG.debug("Options: " + keytabKerberosAce.getOptions()); + } + } + } + } + + } + + return appConfigurationEntry; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java new file mode 100644 index 0000000..5e84c7e --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.SecurityContext; + +import java.util.Map; + +/* + * Test security context to support handling both client and server principals in MiniKDC + * This class is used only in integration test code for connectors like Kafka, HDFS etc., + */ +@Internal +public class TestingSecurityContext { + + public static void install(SecurityContext.SecurityConfiguration config, + Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) + throws Exception { + + SecurityContext.install(config); + + // establish the JAAS config for Test environment + TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(), + config.getPrincipal(), clientSecurityConfigurationMap); + javax.security.auth.login.Configuration.setConfiguration(jaasConfig); + } + + public static class ClientSecurityConfiguration { + + private String principal; + + private String keytab; + + private String moduleName; + + private String jaasServiceName; + + public String getPrincipal() { + return principal; + } + + public String getKeytab() { + return keytab; + } + + public String getModuleName() { + return moduleName; + } + + public String getJaasServiceName() { + return jaasServiceName; + } + + public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) { + this.principal = principal; + this.keytab = keytab; + this.moduleName = moduleName; + this.jaasServiceName = jaasServiceName; + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index ffdca36..68e4752 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -103,6 +103,13 @@ under the License. <artifactId>akka-testkit_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> + </dependency> + </dependencies> <build> @@ -298,6 +305,19 @@ under the License. <skip>true</skip> </configuration> </plugin> + + <!-- + https://issues.apache.org/jira/browse/DIRSHARED-134 + Required to pull the Mini-KDC transitive dependency + --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.1</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index d03d9eb..a503115 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -180,7 +180,7 @@ public class FlinkYarnSessionCliTest { Mockito.mock(YarnClient.class), Mockito.mock(ApplicationReport.class), config, - new Path("/tmp"), false); + new Path("/temp"), false); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index a293348..9d6ff85 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -50,14 +50,14 @@ import java.util.concurrent.TimeUnit; public class YARNHighAvailabilityITCase extends YarnTestBase { - private static TestingServer zkServer; + protected static TestingServer zkServer; - private static ActorSystem actorSystem; + protected static ActorSystem actorSystem; - private static final int numberApplicationAttempts = 10; + protected static final int numberApplicationAttempts = 10; @Rule - public TemporaryFolder tmp = new TemporaryFolder(); + public TemporaryFolder temp = new TemporaryFolder(); @BeforeClass public static void setup() { @@ -108,7 +108,11 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); flinkYarnClient.setConfigurationDirectory(confDirPath); - String fsStateHandlePath = tmp.getRoot().getPath(); + String fsStateHandlePath = temp.getRoot().getPath(); + + // load the configuration + File configDirectory = new File(confDirPath); + GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration()); flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index ddea4dd..650397d 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Joiner; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -516,6 +518,27 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { } catch(Throwable t) { LOG.warn("Error while detached yarn session was running", t); Assert.fail(t.getMessage()); + } finally { + + //cleanup the yarn-properties file + String confDirPath = System.getenv("FLINK_CONF_DIR"); + File configDirectory = new File(confDirPath); + LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath()); + + // load the configuration + LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file"); + GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); + + try { + File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration()); + if(yarnPropertiesFile.exists()) { + LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath()); + yarnPropertiesFile.delete(); + } + } catch (Exception e) { + LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 3caa0ee..ca696f9 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -100,6 +100,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase { while(getRunningContainers() < 2) { sleep(500); } + + //additional sleep for the JM/TM to start and establish connection + sleep(2000); LOG.info("Two containers are running. Killing the application"); // kill application "externally". @@ -121,6 +124,27 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } catch(Throwable t) { LOG.warn("Killing failed", t); Assert.fail(); + } finally { + + //cleanup the yarn-properties file + String confDirPath = System.getenv("FLINK_CONF_DIR"); + File configDirectory = new File(confDirPath); + LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + configDirectory.getAbsolutePath()); + + // load the configuration + LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file"); + GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); + + try { + File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration()); + if(yarnPropertiesFile.exists()) { + LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath()); + yarnPropertiesFile.delete(); + } + } catch (Exception e) { + LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e); + } + } LOG.info("Finished testDetachedMode()"); http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java new file mode 100644 index 0000000..0b7c230 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.test.util.SecureTestEnvironment; +import org.apache.flink.test.util.TestingSecurityContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase { + + protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class); + + @BeforeClass + public static void setup() { + + LOG.info("starting secure cluster environment for testing"); + + yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); + yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured"); + + SecureTestEnvironment.prepare(tmp); + + populateYarnSecureConfigurations(yarnConfiguration,SecureTestEnvironment.getHadoopServicePrincipal(), + SecureTestEnvironment.getTestKeytab()); + + Configuration flinkConfig = new Configuration(); + flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, + SecureTestEnvironment.getTestKeytab()); + flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, + SecureTestEnvironment.getHadoopServicePrincipal()); + + SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration(); + ctx.setFlinkConfiguration(flinkConfig); + ctx.setHadoopConfiguration(yarnConfiguration); + try { + TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap()); + + SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + @Override + public Integer run() { + startYARNSecureMode(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(), + SecureTestEnvironment.getTestKeytab()); + return null; + } + }); + + } catch(Exception e) { + throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e); + } + + } + + @AfterClass + public static void teardownSecureCluster() throws Exception { + LOG.info("tearing down secure cluster environment"); + SecureTestEnvironment.cleanup(); + } + + /* For secure cluster testing, it is enough to run only one test and override below test methods + * to keep the overall build time minimal + */ + @Override + public void testQueryCluster() {} + + @Override + public void testNonexistingQueue() {} + + @Override + public void testResourceComputation() {} + + @Override + public void testfullAlloc() {} + + @Override + public void testJavaAPI() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 6270010..605aa44 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.yarn; import akka.actor.Identify; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.program.ClusterClient; @@ -29,6 +30,8 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -62,6 +65,8 @@ import java.io.FileWriter; import java.io.FilenameFilter; import java.io.IOException; import java.io.PrintStream; +import java.io.BufferedWriter; +import java.io.PrintWriter; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -123,6 +128,11 @@ public abstract class YarnTestBase extends TestLogger { */ protected static File flinkLibFolder; + /** + * Temporary folder where Flink configurations will be kept for secure run + */ + protected static File tempConfPathForSecureRun = null; + static { yarnConfiguration = new YarnConfiguration(); yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); @@ -140,6 +150,23 @@ public abstract class YarnTestBase extends TestLogger { } + public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) { + + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + + conf.set(YarnConfiguration.RM_KEYTAB, keytab); + conf.set(YarnConfiguration.RM_PRINCIPAL, principal); + conf.set(YarnConfiguration.NM_KEYTAB, keytab); + conf.set(YarnConfiguration.NM_PRINCIPAL, principal); + + conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal); + conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab); + conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal); + conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab); + + conf.set("hadoop.security.auth_to_local","RULE:[1:$1] RULE:[2:$1]"); + } /** * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) @@ -336,8 +363,16 @@ public abstract class YarnTestBase extends TestLogger { return count; } + public static void startYARNSecureMode(Configuration conf, String principal, String keytab) { + start(conf, principal, keytab); + } + public static void startYARNWithConfig(Configuration conf) { - // set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file + start(conf,null,null); + } + + private static void start(Configuration conf, String principal, String keytab) { + // set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file File homeDir = null; try { homeDir = tmp.newFolder(); @@ -374,7 +409,39 @@ public abstract class YarnTestBase extends TestLogger { File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"})); Assert.assertNotNull(flinkConfDirPath); - map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent()); + if(!StringUtils.isBlank(principal) && !StringUtils.isBlank(keytab)) { + //copy conf dir to test temporary workspace location + tempConfPathForSecureRun = tmp.newFolder("conf"); + + String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath(); + FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); + + try(FileWriter fw = new FileWriter(new File(tempConfPathForSecureRun,"flink-conf.yaml"), true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) + { + LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file"); + out.println(""); + out.println("#Security Configurations Auto Populated "); + out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab); + out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal); + out.println(""); + } catch (IOException e) { + LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage()); + throw new RuntimeException(e); + } + + String configDir = tempConfPathForSecureRun.getAbsolutePath(); + + LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir); + + Assert.assertNotNull(configDir); + + map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir); + + } else { + map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent()); + } File yarnConfFile = writeYarnSiteConfigXML(conf); map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); @@ -392,6 +459,7 @@ public abstract class YarnTestBase extends TestLogger { LOG.error("setup failure", ex); Assert.fail(); } + } /** @@ -421,7 +489,6 @@ public abstract class YarnTestBase extends TestLogger { System.setOut(new PrintStream(outContent)); System.setErr(new PrintStream(errContent)); - final int START_TIMEOUT_SECONDS = 60; Runner runner = new Runner(args, type); @@ -624,12 +691,23 @@ public abstract class YarnTestBase extends TestLogger { @AfterClass public static void teardown() throws Exception { + + LOG.info("Stopping MiniYarn Cluster"); + yarnCluster.stop(); + // Unset FLINK_CONF_DIR, as it might change the behavior of other tests Map<String, String> map = new HashMap<>(System.getenv()); map.remove(ConfigConstants.ENV_FLINK_CONF_DIR); + map.remove("YARN_CONF_DIR"); + map.remove("IN_TESTS"); TestBaseUtils.setEnv(map); - // When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files) + if(tempConfPathForSecureRun != null) { + FileUtil.fullyDelete(tempConfPathForSecureRun); + tempConfPathForSecureRun = null; + } + + // When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files) // to <flinkRoot>/target/flink-yarn-tests-*. // The files from there are picked up by the ./tools/travis_watchdog.sh script // to upload them to Amazon S3. @@ -646,6 +724,7 @@ public abstract class YarnTestBase extends TestLogger { LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e); } } + } public static boolean isOnTravis() { http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties index e94ca26..8f56c1f 100644 --- a/flink-yarn-tests/src/test/resources/log4j-test.properties +++ b/flink-yarn-tests/src/test/resources/log4j-test.properties @@ -34,3 +34,8 @@ log4j.logger.org.apache.hadoop=OFF log4j.logger.org.apache.flink.runtime.leaderelection=INFO log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO +log4j.logger.org.apache.directory=OFF +log4j.logger.org.mortbay.log=OFF, testlogger +log4j.logger.net.sf.ehcache=OFF +log4j.logger.org.apache.hadoop.metrics2=OFF +log4j.logger.org.apache.hadoop.conf.Configuration=OFF
