This is an automated email from the ASF dual-hosted git repository. sidmishra pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 66cea39 ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before start, Added retry logic for Embedded Kafka start having port bind issue 66cea39 is described below commit 66cea39bf45b01abc6935797e94a59826a037296 Author: Sidharth Mishra <sidharthkmis...@gmail.com> AuthorDate: Thu Oct 28 13:48:37 2021 -0700 ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before start, Added retry logic for Embedded Kafka start having port bind issue Signed-off-by: Sidharth Mishra <sidmis...@apache.org> (cherry picked from commit 65bcd9f57354fd5e50435d299df8508c14108184) --- addons/falcon-bridge/pom.xml | 2 + addons/hbase-bridge/pom.xml | 2 + addons/hive-bridge/pom.xml | 2 + addons/impala-bridge/pom.xml | 2 + addons/kafka-bridge/pom.xml | 2 + addons/sqoop-bridge/pom.xml | 2 + addons/storm-bridge/pom.xml | 2 + .../apache/atlas/kafka/EmbeddedKafkaServer.java | 69 +++++++++++++---- .../apache/atlas/util/CommandHandlerUtility.java | 88 ++++++++++++++++++++++ pom.xml | 4 +- webapp/pom.xml | 2 + 11 files changed, 159 insertions(+), 18 deletions(-) diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index a1c1972..8e25111 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -310,6 +310,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> <configuration> diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml index 439943a..063105d 100644 --- a/addons/hbase-bridge/pom.xml +++ b/addons/hbase-bridge/pom.xml @@ -466,6 +466,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> <configuration> diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 5385010..3910495 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -436,6 +436,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> </execution> diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml index d623885..ab977da 100644 --- a/addons/impala-bridge/pom.xml +++ b/addons/impala-bridge/pom.xml @@ -456,6 +456,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> </execution> diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml index f721a12..59b77fb 100644 --- a/addons/kafka-bridge/pom.xml +++ b/addons/kafka-bridge/pom.xml @@ -302,6 +302,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> <configuration> diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml index 5975462..0a2cbfb 100644 --- a/addons/sqoop-bridge/pom.xml +++ b/addons/sqoop-bridge/pom.xml @@ -370,6 +370,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> <configuration> diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index e50124d..7d205b3 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -487,6 +487,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> <configuration> diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java index 19717fb..0a1f02a 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java @@ -19,12 +19,14 @@ package org.apache.atlas.kafka; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; +import kafka.zookeeper.ZooKeeperClientException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationConverter; import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.Time; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; @@ -33,19 +35,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import org.apache.atlas.util.CommandHandlerUtility; import scala.Option; -import scala.collection.mutable.ArrayBuffer; import javax.inject.Inject; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.MalformedURLException; -import java.net.URISyntaxException; import java.net.URL; +import java.net.BindException; import java.util.*; - @Component @Order(3) public class EmbeddedKafkaServer implements Service { @@ -55,8 +56,10 @@ public class EmbeddedKafkaServer implements Service { private static final String ATLAS_KAFKA_DATA = "data"; public static final String PROPERTY_EMBEDDED = "atlas.notification.embedded"; + private static final int MAX_RETRY_TO_ACQUIRE_PORT = 3; + private final boolean isEmbedded; - private final Properties properties; + private Properties properties; private KafkaServer kafkaServer; private ServerCnxnFactory factory; @@ -102,7 +105,7 @@ public class EmbeddedKafkaServer implements Service { LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded); } - private String startZk() throws IOException, InterruptedException, URISyntaxException { + private String startZk() throws IOException, InterruptedException { String zkValue = properties.getProperty("zookeeper.connect"); LOG.info("Starting zookeeper at {}", zkValue); @@ -111,7 +114,20 @@ public class EmbeddedKafkaServer implements Service { File snapshotDir = constructDir("zk/txn"); File logDir = constructDir("zk/snap"); - factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024); + for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT; attemptCount++) { + try { + factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024); + break; + } catch (BindException e) { + LOG.warn("Attempt {}: Starting zookeeper at {} failed", attemptCount, zkValue); + + if(attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) { + throw e; + } + + CommandHandlerUtility.tryKillingProcessUsingPort(zkAddress.getPort(), attemptCount != 0); + } + } factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500)); @@ -122,7 +138,7 @@ public class EmbeddedKafkaServer implements Service { return ret; } - private void startKafka() throws IOException, URISyntaxException { + private void startKafka() throws IOException { String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); LOG.info("Starting kafka at {}", kafkaValue); @@ -130,15 +146,36 @@ public class EmbeddedKafkaServer implements Service { URL kafkaAddress = getURL(kafkaValue); Properties brokerConfig = properties; - brokerConfig.setProperty("broker.id", "1"); - brokerConfig.setProperty("host.name", kafkaAddress.getHost()); - brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort())); - brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); - brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); - - kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), false); - - kafkaServer.startup(); + for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT; attemptCount++) { + try { + brokerConfig.setProperty("broker.id", "1"); + brokerConfig.setProperty("host.name", kafkaAddress.getHost()); + brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort())); + brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); + brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); + + kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), false); + + kafkaServer.startup(); + break; + } catch (KafkaException | ZooKeeperClientException e) { + LOG.warn("Attempt {}: kafka server with broker config {} failed", attemptCount, brokerConfig); + + if (attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) { + throw e; + } + + if (kafkaServer != null) { + try { + kafkaServer.shutdown(); + } catch (Exception ex) { + LOG.info("Failed to shutdown kafka server", ex); + } + } + + CommandHandlerUtility.tryKillingProcessUsingPort(kafkaAddress.getPort(), attemptCount != 0); + } + } LOG.info("Embedded kafka server started with broker config {}", brokerConfig); } diff --git a/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java new file mode 100644 index 0000000..13ee786 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.util; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.stream.Collectors; + +public class CommandHandlerUtility { + public static final Logger LOG = LoggerFactory.getLogger(CommandHandlerUtility.class); + + private static final String SHELL_CMD = "/bin/sh"; + private static final String SHELL_CMD_OPTION = "-c"; + private static final String FIND_PROCESS_ID_CMD_FORMAT = "lsof -i:%s | tail -n 1 | tr -s ' ' | cut -d' ' -f2"; + private static final String KILL_PROCESS_CMD_FORMAT = "kill %s %s" ; + private static final int SLEEP_AFTER_SOFT_KILL_IN_MS = 10000; + + public static void tryKillingProcessUsingPort(int port, boolean forceKill) { + String processID = findProcessIdUsingPort(port); + sendKillToPID(processID, forceKill); + } + + private static String findProcessIdUsingPort(int port) { + String retPID = ""; + + final String[] cmd = { + SHELL_CMD, + SHELL_CMD_OPTION, + String.format(FIND_PROCESS_ID_CMD_FORMAT, port) + }; + + try { + Process p = Runtime.getRuntime().exec(cmd); + retPID = new BufferedReader(new InputStreamReader(p.getInputStream())) + .lines().collect(Collectors.joining("\n")); + + if (StringUtils.isEmpty(retPID)) { + String errorMsg = new BufferedReader(new InputStreamReader(p.getErrorStream())) + .lines().collect(Collectors.joining("\n")); + throw new IOException(errorMsg); + } + } catch (IOException e) { + LOG.warn("Failed to get process ID which uses the port{}", port, e); + } + + return retPID; + } + + private static void sendKillToPID(String pid, boolean forceKill) { + if (StringUtils.isBlank(pid)) { + return; + } + + final String cmd = String.format(KILL_PROCESS_CMD_FORMAT, (forceKill ? "-9 " : ""), pid); + + try { + Runtime.getRuntime().exec(cmd); + + if (!forceKill) { + LOG.info("Sleeping for {} milliseconds after soft kill", SLEEP_AFTER_SOFT_KILL_IN_MS); + Thread.sleep(SLEEP_AFTER_SOFT_KILL_IN_MS); + } + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to kill the process {} which uses the port with hard kill flag{}", pid, forceKill, e); + } + } +} diff --git a/pom.xml b/pom.xml index c1cd69d..b5dbc46 100644 --- a/pom.xml +++ b/pom.xml @@ -765,7 +765,7 @@ <sqoop.version>1.4.6.2.3.99.0-195</sqoop.version> <storm.version>2.1.0</storm.version> <surefire.forkCount>2C</surefire.forkCount> - <surefire.version>2.18.1</surefire.version> + <surefire.version>3.0.0-M5</surefire.version> <testng.version>6.9.4</testng.version> <tinkerpop.version>3.4.10</tinkerpop.version> <woodstox-core.version>5.0.3</woodstox-core.version> @@ -1952,7 +1952,7 @@ <dependency> <groupId>org.apache.maven.surefire</groupId> <artifactId>surefire-testng</artifactId> - <version>2.18.1</version> + <version>${surefire.version}</version> </dependency> </dependencies> </plugin> diff --git a/webapp/pom.xml b/webapp/pom.xml index 68658a1..0ef5cc4 100755 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -768,6 +768,8 @@ <id>start-jetty</id> <phase>pre-integration-test</phase> <goals> + <!-- stop any previous instance to free up the port --> + <goal>stop</goal> <goal>deploy-war</goal> </goals> </execution>