This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 66cea39  ATLAS-4462: Upgraded Surefire version, Updated the pom to 
stop jetty before start, Added retry logic for Embedded Kafka start having port 
bind issue
66cea39 is described below

commit 66cea39bf45b01abc6935797e94a59826a037296
Author: Sidharth Mishra <sidharthkmis...@gmail.com>
AuthorDate: Thu Oct 28 13:48:37 2021 -0700

    ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before 
start, Added retry logic for Embedded Kafka start having port bind issue
    
    Signed-off-by: Sidharth Mishra <sidmis...@apache.org>
    (cherry picked from commit 65bcd9f57354fd5e50435d299df8508c14108184)
---
 addons/falcon-bridge/pom.xml                       |  2 +
 addons/hbase-bridge/pom.xml                        |  2 +
 addons/hive-bridge/pom.xml                         |  2 +
 addons/impala-bridge/pom.xml                       |  2 +
 addons/kafka-bridge/pom.xml                        |  2 +
 addons/sqoop-bridge/pom.xml                        |  2 +
 addons/storm-bridge/pom.xml                        |  2 +
 .../apache/atlas/kafka/EmbeddedKafkaServer.java    | 69 +++++++++++++----
 .../apache/atlas/util/CommandHandlerUtility.java   | 88 ++++++++++++++++++++++
 pom.xml                                            |  4 +-
 webapp/pom.xml                                     |  2 +
 11 files changed, 159 insertions(+), 18 deletions(-)

diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index a1c1972..8e25111 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -310,6 +310,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml
index 439943a..063105d 100644
--- a/addons/hbase-bridge/pom.xml
+++ b/addons/hbase-bridge/pom.xml
@@ -466,6 +466,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 5385010..3910495 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -436,6 +436,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                     </execution>
diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
index d623885..ab977da 100644
--- a/addons/impala-bridge/pom.xml
+++ b/addons/impala-bridge/pom.xml
@@ -456,6 +456,8 @@
             <id>start-jetty</id>
             <phase>pre-integration-test</phase>
             <goals>
+              <!-- stop any previous instance to free up the port -->
+              <goal>stop</goal>
               <goal>deploy-war</goal>
             </goals>
           </execution>
diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index f721a12..59b77fb 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -302,6 +302,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 5975462..0a2cbfb 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -370,6 +370,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index e50124d..7d205b3 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -487,6 +487,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java 
b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 19717fb..0a1f02a 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -19,12 +19,14 @@ package org.apache.atlas.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
+import kafka.zookeeper.ZooKeeperClientException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -33,19 +35,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
+import org.apache.atlas.util.CommandHandlerUtility;
 import scala.Option;
-import scala.collection.mutable.ArrayBuffer;
 
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
-import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.BindException;
 import java.util.*;
 
-
 @Component
 @Order(3)
 public class EmbeddedKafkaServer implements Service {
@@ -55,8 +56,10 @@ public class EmbeddedKafkaServer implements Service {
     private static final String ATLAS_KAFKA_DATA  = "data";
     public  static final String PROPERTY_EMBEDDED = 
"atlas.notification.embedded";
 
+    private static final int    MAX_RETRY_TO_ACQUIRE_PORT = 3;
+
     private final boolean           isEmbedded;
-    private final Properties        properties;
+    private       Properties        properties;
     private       KafkaServer       kafkaServer;
     private       ServerCnxnFactory factory;
 
@@ -102,7 +105,7 @@ public class EmbeddedKafkaServer implements Service {
         LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded);
     }
 
-    private String startZk() throws IOException, InterruptedException, 
URISyntaxException {
+    private String startZk() throws IOException, InterruptedException {
         String zkValue = properties.getProperty("zookeeper.connect");
 
         LOG.info("Starting zookeeper at {}", zkValue);
@@ -111,7 +114,20 @@ public class EmbeddedKafkaServer implements Service {
         File snapshotDir = constructDir("zk/txn");
         File logDir      = constructDir("zk/snap");
 
-        factory = NIOServerCnxnFactory.createFactory(new 
InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+        for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT; 
attemptCount++) {
+            try {
+                factory     = NIOServerCnxnFactory.createFactory(new 
InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+                break;
+            } catch (BindException e) {
+                LOG.warn("Attempt {}: Starting zookeeper at {} failed", 
attemptCount, zkValue);
+
+                if(attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) {
+                    throw e;
+                }
+
+                
CommandHandlerUtility.tryKillingProcessUsingPort(zkAddress.getPort(), 
attemptCount != 0);
+            }
+        }
 
         factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
 
@@ -122,7 +138,7 @@ public class EmbeddedKafkaServer implements Service {
         return ret;
     }
 
-    private void startKafka() throws IOException, URISyntaxException {
+    private void startKafka() throws IOException {
         String kafkaValue = 
properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
 
         LOG.info("Starting kafka at {}", kafkaValue);
@@ -130,15 +146,36 @@ public class EmbeddedKafkaServer implements Service {
         URL        kafkaAddress = getURL(kafkaValue);
         Properties brokerConfig = properties;
 
-        brokerConfig.setProperty("broker.id", "1");
-        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
-        brokerConfig.setProperty("port", 
String.valueOf(kafkaAddress.getPort()));
-        brokerConfig.setProperty("log.dirs", 
constructDir("kafka").getAbsolutePath());
-        brokerConfig.setProperty("log.flush.interval.messages", 
String.valueOf(1));
-
-        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), 
Time.SYSTEM, Option.apply(this.getClass().getName()), false);
-
-        kafkaServer.startup();
+        for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT; 
attemptCount++) {
+            try {
+                brokerConfig.setProperty("broker.id", "1");
+                brokerConfig.setProperty("host.name", kafkaAddress.getHost());
+                brokerConfig.setProperty("port", 
String.valueOf(kafkaAddress.getPort()));
+                brokerConfig.setProperty("log.dirs", 
constructDir("kafka").getAbsolutePath());
+                brokerConfig.setProperty("log.flush.interval.messages", 
String.valueOf(1));
+
+                kafkaServer = new 
KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, 
Option.apply(this.getClass().getName()), false);
+
+                kafkaServer.startup();
+                break;
+            } catch (KafkaException | ZooKeeperClientException e) {
+                LOG.warn("Attempt {}: kafka server with broker config {} 
failed", attemptCount, brokerConfig);
+
+                if (attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) {
+                    throw e;
+                }
+
+                if (kafkaServer != null) {
+                    try {
+                        kafkaServer.shutdown();
+                    } catch (Exception ex) {
+                        LOG.info("Failed to shutdown kafka server", ex);
+                    }
+                }
+
+                
CommandHandlerUtility.tryKillingProcessUsingPort(kafkaAddress.getPort(), 
attemptCount != 0);
+            }
+        }
 
         LOG.info("Embedded kafka server started with broker config {}", 
brokerConfig);
     }
diff --git 
a/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java 
b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
new file mode 100644
index 0000000..13ee786
--- /dev/null
+++ 
b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.stream.Collectors;
+
+public class CommandHandlerUtility {
+    public static final Logger LOG = 
LoggerFactory.getLogger(CommandHandlerUtility.class);
+
+    private static final String SHELL_CMD                   = "/bin/sh";
+    private static final String SHELL_CMD_OPTION            = "-c";
+    private static final String FIND_PROCESS_ID_CMD_FORMAT  = "lsof -i:%s | 
tail -n 1 | tr -s ' ' | cut -d' ' -f2";
+    private static final String KILL_PROCESS_CMD_FORMAT     = "kill %s %s" ;
+    private static final int    SLEEP_AFTER_SOFT_KILL_IN_MS = 10000;
+
+    public static void tryKillingProcessUsingPort(int port, boolean forceKill) 
{
+        String processID = findProcessIdUsingPort(port);
+        sendKillToPID(processID, forceKill);
+    }
+
+    private static String findProcessIdUsingPort(int port) {
+        String         retPID = "";
+
+        final String[] cmd = {
+                SHELL_CMD,
+                SHELL_CMD_OPTION,
+                String.format(FIND_PROCESS_ID_CMD_FORMAT, port)
+        };
+
+        try {
+            Process p = Runtime.getRuntime().exec(cmd);
+            retPID = new BufferedReader(new 
InputStreamReader(p.getInputStream()))
+                    .lines().collect(Collectors.joining("\n"));
+
+            if (StringUtils.isEmpty(retPID)) {
+                String errorMsg = new BufferedReader(new 
InputStreamReader(p.getErrorStream()))
+                        .lines().collect(Collectors.joining("\n"));
+                throw new IOException(errorMsg);
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to get process ID which uses the port{}", port, 
e);
+        }
+
+        return retPID;
+    }
+
+    private static void sendKillToPID(String pid, boolean forceKill) {
+        if (StringUtils.isBlank(pid)) {
+            return;
+        }
+
+        final String cmd = String.format(KILL_PROCESS_CMD_FORMAT, (forceKill ? 
"-9 " : ""), pid);
+
+        try {
+            Runtime.getRuntime().exec(cmd);
+
+            if (!forceKill) {
+                LOG.info("Sleeping for {} milliseconds after soft kill", 
SLEEP_AFTER_SOFT_KILL_IN_MS);
+                Thread.sleep(SLEEP_AFTER_SOFT_KILL_IN_MS);
+            }
+        } catch (IOException | InterruptedException e) {
+            LOG.warn("Failed to kill the process {} which uses the port with 
hard kill flag{}", pid, forceKill, e);
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index c1cd69d..b5dbc46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -765,7 +765,7 @@
         <sqoop.version>1.4.6.2.3.99.0-195</sqoop.version>
         <storm.version>2.1.0</storm.version>
         <surefire.forkCount>2C</surefire.forkCount>
-        <surefire.version>2.18.1</surefire.version>
+        <surefire.version>3.0.0-M5</surefire.version>
         <testng.version>6.9.4</testng.version>
         <tinkerpop.version>3.4.10</tinkerpop.version>
         <woodstox-core.version>5.0.3</woodstox-core.version>
@@ -1952,7 +1952,7 @@
                     <dependency>
                         <groupId>org.apache.maven.surefire</groupId>
                         <artifactId>surefire-testng</artifactId>
-                        <version>2.18.1</version>
+                        <version>${surefire.version}</version>
                     </dependency>
                 </dependencies>
             </plugin>
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 68658a1..0ef5cc4 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -768,6 +768,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the 
port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                     </execution>

Reply via email to