This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 66cea39 ATLAS-4462: Upgraded Surefire version, Updated the pom to
stop jetty before start, Added retry logic for Embedded Kafka start having port
bind issue
66cea39 is described below
commit 66cea39bf45b01abc6935797e94a59826a037296
Author: Sidharth Mishra <[email protected]>
AuthorDate: Thu Oct 28 13:48:37 2021 -0700
ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before
start, Added retry logic for Embedded Kafka start having port bind issue
Signed-off-by: Sidharth Mishra <[email protected]>
(cherry picked from commit 65bcd9f57354fd5e50435d299df8508c14108184)
---
addons/falcon-bridge/pom.xml | 2 +
addons/hbase-bridge/pom.xml | 2 +
addons/hive-bridge/pom.xml | 2 +
addons/impala-bridge/pom.xml | 2 +
addons/kafka-bridge/pom.xml | 2 +
addons/sqoop-bridge/pom.xml | 2 +
addons/storm-bridge/pom.xml | 2 +
.../apache/atlas/kafka/EmbeddedKafkaServer.java | 69 +++++++++++++----
.../apache/atlas/util/CommandHandlerUtility.java | 88 ++++++++++++++++++++++
pom.xml | 4 +-
webapp/pom.xml | 2 +
11 files changed, 159 insertions(+), 18 deletions(-)
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index a1c1972..8e25111 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -310,6 +310,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
<configuration>
diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml
index 439943a..063105d 100644
--- a/addons/hbase-bridge/pom.xml
+++ b/addons/hbase-bridge/pom.xml
@@ -466,6 +466,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
<configuration>
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 5385010..3910495 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -436,6 +436,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
</execution>
diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
index d623885..ab977da 100644
--- a/addons/impala-bridge/pom.xml
+++ b/addons/impala-bridge/pom.xml
@@ -456,6 +456,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
</execution>
diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index f721a12..59b77fb 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -302,6 +302,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
<configuration>
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 5975462..0a2cbfb 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -370,6 +370,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
<configuration>
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index e50124d..7d205b3 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -487,6 +487,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
<configuration>
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 19717fb..0a1f02a 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -19,12 +19,14 @@ package org.apache.atlas.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
+import kafka.zookeeper.ZooKeeperClientException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -33,19 +35,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
+import org.apache.atlas.util.CommandHandlerUtility;
import scala.Option;
-import scala.collection.mutable.ArrayBuffer;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
-import java.net.URISyntaxException;
import java.net.URL;
+import java.net.BindException;
import java.util.*;
-
@Component
@Order(3)
public class EmbeddedKafkaServer implements Service {
@@ -55,8 +56,10 @@ public class EmbeddedKafkaServer implements Service {
private static final String ATLAS_KAFKA_DATA = "data";
public static final String PROPERTY_EMBEDDED =
"atlas.notification.embedded";
+ private static final int MAX_RETRY_TO_ACQUIRE_PORT = 3;
+
private final boolean isEmbedded;
- private final Properties properties;
+ private Properties properties;
private KafkaServer kafkaServer;
private ServerCnxnFactory factory;
@@ -102,7 +105,7 @@ public class EmbeddedKafkaServer implements Service {
LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded);
}
- private String startZk() throws IOException, InterruptedException,
URISyntaxException {
+ private String startZk() throws IOException, InterruptedException {
String zkValue = properties.getProperty("zookeeper.connect");
LOG.info("Starting zookeeper at {}", zkValue);
@@ -111,7 +114,20 @@ public class EmbeddedKafkaServer implements Service {
File snapshotDir = constructDir("zk/txn");
File logDir = constructDir("zk/snap");
- factory = NIOServerCnxnFactory.createFactory(new
InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+ for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT;
attemptCount++) {
+ try {
+ factory = NIOServerCnxnFactory.createFactory(new
InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+ break;
+ } catch (BindException e) {
+ LOG.warn("Attempt {}: Starting zookeeper at {} failed",
attemptCount, zkValue);
+
+ if(attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) {
+ throw e;
+ }
+
+
CommandHandlerUtility.tryKillingProcessUsingPort(zkAddress.getPort(),
attemptCount != 0);
+ }
+ }
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
@@ -122,7 +138,7 @@ public class EmbeddedKafkaServer implements Service {
return ret;
}
- private void startKafka() throws IOException, URISyntaxException {
+ private void startKafka() throws IOException {
String kafkaValue =
properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
LOG.info("Starting kafka at {}", kafkaValue);
@@ -130,15 +146,36 @@ public class EmbeddedKafkaServer implements Service {
URL kafkaAddress = getURL(kafkaValue);
Properties brokerConfig = properties;
- brokerConfig.setProperty("broker.id", "1");
- brokerConfig.setProperty("host.name", kafkaAddress.getHost());
- brokerConfig.setProperty("port",
String.valueOf(kafkaAddress.getPort()));
- brokerConfig.setProperty("log.dirs",
constructDir("kafka").getAbsolutePath());
- brokerConfig.setProperty("log.flush.interval.messages",
String.valueOf(1));
-
- kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig),
Time.SYSTEM, Option.apply(this.getClass().getName()), false);
-
- kafkaServer.startup();
+ for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT;
attemptCount++) {
+ try {
+ brokerConfig.setProperty("broker.id", "1");
+ brokerConfig.setProperty("host.name", kafkaAddress.getHost());
+ brokerConfig.setProperty("port",
String.valueOf(kafkaAddress.getPort()));
+ brokerConfig.setProperty("log.dirs",
constructDir("kafka").getAbsolutePath());
+ brokerConfig.setProperty("log.flush.interval.messages",
String.valueOf(1));
+
+ kafkaServer = new
KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM,
Option.apply(this.getClass().getName()), false);
+
+ kafkaServer.startup();
+ break;
+ } catch (KafkaException | ZooKeeperClientException e) {
+ LOG.warn("Attempt {}: kafka server with broker config {}
failed", attemptCount, brokerConfig);
+
+ if (attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) {
+ throw e;
+ }
+
+ if (kafkaServer != null) {
+ try {
+ kafkaServer.shutdown();
+ } catch (Exception ex) {
+ LOG.info("Failed to shutdown kafka server", ex);
+ }
+ }
+
+
CommandHandlerUtility.tryKillingProcessUsingPort(kafkaAddress.getPort(),
attemptCount != 0);
+ }
+ }
LOG.info("Embedded kafka server started with broker config {}",
brokerConfig);
}
diff --git
a/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
new file mode 100644
index 0000000..13ee786
--- /dev/null
+++
b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.stream.Collectors;
+
+public class CommandHandlerUtility {
+ public static final Logger LOG =
LoggerFactory.getLogger(CommandHandlerUtility.class);
+
+ private static final String SHELL_CMD = "/bin/sh";
+ private static final String SHELL_CMD_OPTION = "-c";
+ private static final String FIND_PROCESS_ID_CMD_FORMAT = "lsof -i:%s |
tail -n 1 | tr -s ' ' | cut -d' ' -f2";
+ private static final String KILL_PROCESS_CMD_FORMAT = "kill %s %s" ;
+ private static final int SLEEP_AFTER_SOFT_KILL_IN_MS = 10000;
+
+ public static void tryKillingProcessUsingPort(int port, boolean forceKill)
{
+ String processID = findProcessIdUsingPort(port);
+ sendKillToPID(processID, forceKill);
+ }
+
+ private static String findProcessIdUsingPort(int port) {
+ String retPID = "";
+
+ final String[] cmd = {
+ SHELL_CMD,
+ SHELL_CMD_OPTION,
+ String.format(FIND_PROCESS_ID_CMD_FORMAT, port)
+ };
+
+ try {
+ Process p = Runtime.getRuntime().exec(cmd);
+ retPID = new BufferedReader(new
InputStreamReader(p.getInputStream()))
+ .lines().collect(Collectors.joining("\n"));
+
+ if (StringUtils.isEmpty(retPID)) {
+ String errorMsg = new BufferedReader(new
InputStreamReader(p.getErrorStream()))
+ .lines().collect(Collectors.joining("\n"));
+ throw new IOException(errorMsg);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to get process ID which uses the port{}", port,
e);
+ }
+
+ return retPID;
+ }
+
+ private static void sendKillToPID(String pid, boolean forceKill) {
+ if (StringUtils.isBlank(pid)) {
+ return;
+ }
+
+ final String cmd = String.format(KILL_PROCESS_CMD_FORMAT, (forceKill ?
"-9 " : ""), pid);
+
+ try {
+ Runtime.getRuntime().exec(cmd);
+
+ if (!forceKill) {
+ LOG.info("Sleeping for {} milliseconds after soft kill",
SLEEP_AFTER_SOFT_KILL_IN_MS);
+ Thread.sleep(SLEEP_AFTER_SOFT_KILL_IN_MS);
+ }
+ } catch (IOException | InterruptedException e) {
+ LOG.warn("Failed to kill the process {} which uses the port with
hard kill flag{}", pid, forceKill, e);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index c1cd69d..b5dbc46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -765,7 +765,7 @@
<sqoop.version>1.4.6.2.3.99.0-195</sqoop.version>
<storm.version>2.1.0</storm.version>
<surefire.forkCount>2C</surefire.forkCount>
- <surefire.version>2.18.1</surefire.version>
+ <surefire.version>3.0.0-M5</surefire.version>
<testng.version>6.9.4</testng.version>
<tinkerpop.version>3.4.10</tinkerpop.version>
<woodstox-core.version>5.0.3</woodstox-core.version>
@@ -1952,7 +1952,7 @@
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
- <version>2.18.1</version>
+ <version>${surefire.version}</version>
</dependency>
</dependencies>
</plugin>
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 68658a1..0ef5cc4 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -768,6 +768,8 @@
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
+ <!-- stop any previous instance to free up the
port -->
+ <goal>stop</goal>
<goal>deploy-war</goal>
</goals>
</execution>