[FLINK-3929] conditionally skip RollingSinkSecuredITCase

- for now, we skip this test class until Hadoop version 3.x.x.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/285b6f74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/285b6f74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/285b6f74

Branch: refs/heads/master
Commit: 285b6f74e0416b200229bd67cb7521e4a6871bbc
Parents: 25a622f
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Sep 1 12:49:53 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java | 37 +++++++++++++++-----
 .../connectors/kafka/KafkaTestBase.java         |  8 +++--
 2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 86cedaf..930ddd2 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,9 +31,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.VersionInfo;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,15 +58,31 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 
 /**
  * Tests for running {@link RollingSinkSecuredITCase} which is an extension of 
{@link RollingSink} in secure environment
+ * Note: only executed for Hadoop version > 3.x.x
  */
-
-//The test is disabled since MiniDFS secure run requires lower order ports to 
be used.
-//We can enable the test when the fix is available (HDFS-9213)
-@Ignore
 public class RollingSinkSecuredITCase extends RollingSinkITCase {
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
 
+       /**
+        * Skips all tests if the Hadoop version doesn't match.
+        * We can't run this test class until HDFS-9213 is fixed which allows a 
secure DataNode
+        * to bind to non-privileged ports for testing.
+        * For now, we skip this test class until Hadoop version 3.x.x.
+        */
+       private static void skipIfHadoopVersionIsNotAppropriate() {
+               // Skips all tests if the Hadoop version doesn't match
+               String hadoopVersionString = VersionInfo.getVersion();
+               String[] split = hadoopVersionString.split("\\.");
+               if (split.length != 3) {
+                       throw new IllegalStateException("Hadoop version was not 
of format 'X.X.X': " + hadoopVersionString);
+               }
+               Assume.assumeTrue(
+                       // check whether we're running Hadoop version >= 3.x.x
+                       Integer.parseInt(split[0]) >= 3
+               );
+       }
+
        /*
         * override super class static methods to avoid creating MiniDFS and 
MiniFlink with wrong configurations
         * and out-of-order sequence for secure cluster
@@ -85,6 +102,8 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
        @BeforeClass
        public static void startSecureCluster() throws Exception {
 
+               skipIfHadoopVersionIsNotAppropriate();
+
                LOG.info("starting secure cluster environment for testing");
 
                dataDir = tempFolder.newFolder();
@@ -143,7 +162,9 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                TestStreamEnvironment.unsetAsContext();
                stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 
-               hdfsCluster.shutdown();
+               if (hdfsCluster != null) {
+                       hdfsCluster.shutdown();
+               }
 
                SecureTestEnvironment.cleanup();
        }
@@ -229,4 +250,4 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
        @Override
        public void testDateTimeRollingStringWriter() throws Exception {}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/285b6f74/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index afdd158..5cec4f0 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -110,7 +110,7 @@ public abstract class KafkaTestBase extends TestLogger {
        }
 
        protected static Configuration getFlinkConfiguration() {
-               Configuration flinkConfig = new Configuration();;
+               Configuration flinkConfig = new Configuration();
                
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -134,7 +134,11 @@ public abstract class KafkaTestBase extends TestLogger {
 
                brokerConnectionStrings = 
kafkaServer.getBrokerConnectionString();
 
-               if(kafkaServer.isSecureRunSupported() && secureMode) {
+               if (secureMode) {
+                       if (!kafkaServer.isSecureRunSupported()) {
+                               throw new IllegalStateException(
+                                       "Attempting to test in secure mode but 
secure mode not supported by the KafkaTestEnvironment.");
+                       }
                        secureProps = kafkaServer.getSecureProperties();
                }
 

Reply via email to