This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fafe0f3  [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc 
which throws 'address in use' BindException with retry
fafe0f3 is described below

commit fafe0f311cc1c48002b68f26ab9b274ffd565665
Author: Kent Yao <yaooq...@hotmail.com>
AuthorDate: Thu May 7 14:37:03 2020 +0900

    [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 
'address in use' BindException with retry
    
    ### What changes were proposed in this pull request?
    The `Kafka*Suite`s are flaky because of the Hadoop MiniKdc issue - 
https://issues.apache.org/jira/browse/HADOOP-12656
    > Looking at MiniKdc implementation, if port is 0, the constructor use 
ServerSocket to find an unused port, assign the port number to the member 
variable port and close the ServerSocket object; later, in initKDCServer(), 
instantiate a TcpTransport object and bind at that port.
    
    > It appears that the port may be used in between, and then throw the 
exception.
    
    Related test failures are suspected,  such as 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122225/testReport/org.apache.spark.sql.kafka010/KafkaDelegationTokenSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/
    
    ```scala
    [info] org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite *** ABORTED 
*** (15 seconds, 426 milliseconds)
    [info]   java.net.BindException: Address already in use
    [info]   at sun.nio.ch.Net.bind0(Native Method)
    [info]   at sun.nio.ch.Net.bind(Net.java:433)
    [info]   at sun.nio.ch.Net.bind(Net.java:425)
    [info]   at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    [info]   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    [info]   at 
org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:198)
    [info]   at 
org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:51)
    [info]   at 
org.apache.mina.core.polling.AbstractPollingIoAcceptor.registerHandles(AbstractPollingIoAcceptor.java:547)
    [info]   at 
org.apache.mina.core.polling.AbstractPollingIoAcceptor.access$400(AbstractPollingIoAcceptor.java:68)
    [info]   at 
org.apache.mina.core.polling.AbstractPollingIoAcceptor$Acceptor.run(AbstractPollingIoAcceptor.java:422)
    [info]   at 
org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
    [info]   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    [info]   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    [info]   at java.lang.Thread.run(Thread.java:748)
    ```
    After comparing the error stack trace with similar issues reported  in 
different projects, such as
    https://issues.apache.org/jira/browse/KAFKA-3453
    https://issues.apache.org/jira/browse/HBASE-14734
    
    We can be sure that they are caused by the same problem issued in 
HADOOP-12656.
    
    In the PR, We apply the approach from HBASE first before we finally drop 
Hadoop 2.7.x
    
    ### Why are the changes needed?
    
    fix test flakiness
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    
    the test itself passing Jenkins
    
    Closes #28442 from yaooqinn/SPARK-31631.
    
    Authored-by: Kent Yao <yaooq...@hotmail.com>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../HadoopDelegationTokenManagerSuite.scala        | 30 ++++++++++++++++++++--
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 29 ++++++++++++++++++---
 2 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 275bca3..fc28968 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -19,10 +19,14 @@ package org.apache.spark.deploy.security
 
 import java.security.PrivilegedExceptionAction
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.conf.Configuration
 import 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -88,8 +92,30 @@ class HadoopDelegationTokenManagerSuite extends 
SparkFunSuite {
       // krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and 
removes it when stop called.
       val kdcDir = Utils.createTempDir()
       val kdcConf = MiniKdc.createConf()
-      kdc = new MiniKdc(kdcConf, kdcDir)
-      kdc.start()
+      // The port for MiniKdc service gets selected in the constructor, but 
will be bound
+      // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> 
KdcServer.start().
+      // In meantime, when some other service might capture the port during 
this progress, and
+      // cause BindException.
+      // This makes our tests which have dedicated JVMs and rely on MiniKDC 
being flaky
+      //
+      // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in 
Hadoop 2.8.0.
+      //
+      // The workaround here is to periodically repeat this process with a 
timeout , since we are
+      // using Hadoop 2.7.4 as default.
+      // https://issues.apache.org/jira/browse/SPARK-31631
+      eventually(timeout(10.seconds), interval(1.second)) {
+        try {
+          kdc = new MiniKdc(kdcConf, kdcDir)
+          kdc.start()
+        } catch {
+          case NonFatal(e) =>
+            if (kdc != null) {
+              kdc.stop()
+              kdc = null
+            }
+            throw e
+        }
+      }
 
       val krbConf = new Configuration()
       krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c1ca557..5db8999 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration
 import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
+import scala.util.control.NonFatal
 
 import com.google.common.io.Files
 import kafka.api.Request
@@ -36,7 +37,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, 
ListConsumerGroupsResult, NewPartitions, NewTopic}
+import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
@@ -136,8 +137,30 @@ class KafkaTestUtils(
     val kdcDir = Utils.createTempDir()
     val kdcConf = MiniKdc.createConf()
     kdcConf.setProperty(MiniKdc.DEBUG, "true")
-    kdc = new MiniKdc(kdcConf, kdcDir)
-    kdc.start()
+    // The port for MiniKdc service gets selected in the constructor, but will 
be bound
+    // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> 
KdcServer.start().
+    // In meantime, when some other service might capture the port during this 
progress, and
+    // cause BindException.
+    // This makes our tests which have dedicated JVMs and rely on MiniKDC 
being flaky
+    //
+    // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 
2.8.0.
+    //
+    // The workaround here is to periodically repeat this process with a 
timeout , since we are
+    // using Hadoop 2.7.4 as default.
+    // https://issues.apache.org/jira/browse/SPARK-31631
+    eventually(timeout(10.seconds), interval(1.second)) {
+      try {
+        kdc = new MiniKdc(kdcConf, kdcDir)
+        kdc.start()
+      } catch {
+        case NonFatal(e) =>
+          if (kdc != null) {
+            kdc.stop()
+            kdc = null
+          }
+          throw e
+      }
+    }
     // TODO https://issues.apache.org/jira/browse/SPARK-30037
     // Need to build spark's own MiniKDC and customize krb5.conf like Kafka
     rewriteKrb5Conf()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to