This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fafe0f3 [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry fafe0f3 is described below commit fafe0f311cc1c48002b68f26ab9b274ffd565665 Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Thu May 7 14:37:03 2020 +0900 [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry ### What changes were proposed in this pull request? The `Kafka*Suite`s are flaky because of the Hadoop MiniKdc issue - https://issues.apache.org/jira/browse/HADOOP-12656 > Looking at MiniKdc implementation, if port is 0, the constructor use ServerSocket to find an unused port, assign the port number to the member variable port and close the ServerSocket object; later, in initKDCServer(), instantiate a TcpTransport object and bind at that port. > It appears that the port may be used in between, and then throw the exception. Related test failures are suspected, such as https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122225/testReport/org.apache.spark.sql.kafka010/KafkaDelegationTokenSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/ ```scala [info] org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite *** ABORTED *** (15 seconds, 426 milliseconds) [info] java.net.BindException: Address already in use [info] at sun.nio.ch.Net.bind0(Native Method) [info] at sun.nio.ch.Net.bind(Net.java:433) [info] at sun.nio.ch.Net.bind(Net.java:425) [info] at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) [info] at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) [info] at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:198) [info] at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:51) [info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor.registerHandles(AbstractPollingIoAcceptor.java:547) [info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor.access$400(AbstractPollingIoAcceptor.java:68) [info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor$Acceptor.run(AbstractPollingIoAcceptor.java:422) [info] at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [info] at java.lang.Thread.run(Thread.java:748) ``` After comparing the error stack trace with similar issues reported in different projects, such as https://issues.apache.org/jira/browse/KAFKA-3453 https://issues.apache.org/jira/browse/HBASE-14734 We can be sure that they are caused by the same problem issued in HADOOP-12656. In the PR, We apply the approach from HBASE first before we finally drop Hadoop 2.7.x ### Why are the changes needed? fix test flakiness ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? the test itself passing Jenkins Closes #28442 from yaooqinn/SPARK-31631. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../HadoopDelegationTokenManagerSuite.scala | 30 ++++++++++++++++++++-- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 29 ++++++++++++++++++--- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 275bca3..fc28968 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -19,10 +19,14 @@ package org.apache.spark.deploy.security import java.security.PrivilegedExceptionAction +import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION import org.apache.hadoop.minikdc.MiniKdc import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil @@ -88,8 +92,30 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { // krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and removes it when stop called. val kdcDir = Utils.createTempDir() val kdcConf = MiniKdc.createConf() - kdc = new MiniKdc(kdcConf, kdcDir) - kdc.start() + // The port for MiniKdc service gets selected in the constructor, but will be bound + // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start(). + // In meantime, when some other service might capture the port during this progress, and + // cause BindException. + // This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky + // + // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0. + // + // The workaround here is to periodically repeat this process with a timeout , since we are + // using Hadoop 2.7.4 as default. + // https://issues.apache.org/jira/browse/SPARK-31631 + eventually(timeout(10.seconds), interval(1.second)) { + try { + kdc = new MiniKdc(kdcConf, kdcDir) + kdc.start() + } catch { + case NonFatal(e) => + if (kdc != null) { + kdc.stop() + kdc = null + } + throw e + } + } val krbConf = new Configuration() krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos") diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index c1ca557..5db8999 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random +import scala.util.control.NonFatal import com.google.common.io.Files import kafka.api.Request @@ -36,7 +37,7 @@ import kafka.zk.KafkaZkClient import org.apache.hadoop.minikdc.MiniKdc import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic} +import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition @@ -136,8 +137,30 @@ class KafkaTestUtils( val kdcDir = Utils.createTempDir() val kdcConf = MiniKdc.createConf() kdcConf.setProperty(MiniKdc.DEBUG, "true") - kdc = new MiniKdc(kdcConf, kdcDir) - kdc.start() + // The port for MiniKdc service gets selected in the constructor, but will be bound + // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start(). + // In meantime, when some other service might capture the port during this progress, and + // cause BindException. + // This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky + // + // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0. + // + // The workaround here is to periodically repeat this process with a timeout , since we are + // using Hadoop 2.7.4 as default. + // https://issues.apache.org/jira/browse/SPARK-31631 + eventually(timeout(10.seconds), interval(1.second)) { + try { + kdc = new MiniKdc(kdcConf, kdcDir) + kdc.start() + } catch { + case NonFatal(e) => + if (kdc != null) { + kdc.stop() + kdc = null + } + throw e + } + } // TODO https://issues.apache.org/jira/browse/SPARK-30037 // Need to build spark's own MiniKDC and customize krb5.conf like Kafka rewriteKrb5Conf() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org