[CARBONDATA-2968] Single pass load fails 2nd time in Spark submit execution due to port binding error
Problem : In secure cluster setup, single pass load is failing in spark-submit after using the beeline. Solution: It was happening because port was not getting updated and was not looking for the next empty port. port variable was not changing.So modified that part and added log to diplay the port number. This closes #2760 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/13ecc9e7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/13ecc9e7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/13ecc9e7 Branch: refs/heads/branch-1.5 Commit: 13ecc9e7a0a42ebf2f8417814c20474f3ce489f1 Parents: e07df44 Author: shardul-cr7 <[email protected]> Authored: Tue Sep 25 19:55:19 2018 +0530 Committer: kumarvishal09 <[email protected]> Committed: Wed Sep 26 14:16:21 2018 +0530 ---------------------------------------------------------------------- .../core/dictionary/server/NonSecureDictionaryServer.java | 3 ++- .../spark/dictionary/server/SecureDictionaryServer.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/13ecc9e7/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java index 95f3d69..dc2d211 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java @@ -109,6 +109,7 @@ public class NonSecureDictionaryServer extends AbstractDictionaryServer }); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); String hostToBind = findLocalIpAddress(LOGGER); + //iteratively listening to newports InetSocketAddress address = hostToBind == null ? new InetSocketAddress(newPort) : new InetSocketAddress(hostToBind, newPort); @@ -119,7 +120,7 @@ public class NonSecureDictionaryServer extends AbstractDictionaryServer this.host = hostToBind; break; } catch (Exception e) { - LOGGER.error(e, "Dictionary Server Failed to bind to port:"); + LOGGER.error(e, "Dictionary Server Failed to bind to port:" + newPort); if (i == 9) { throw new RuntimeException("Dictionary Server Could not bind to any port"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/13ecc9e7/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java index f4948c4..995e520 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java @@ -143,14 +143,16 @@ public class SecureDictionaryServer extends AbstractDictionaryServer implements TransportServerBootstrap bootstrap = new SaslServerBootstrap(transportConf, securityManager); String host = findLocalIpAddress(LOGGER); - context.createServer(host, port, Lists.<TransportServerBootstrap>newArrayList(bootstrap)); + //iteratively listening to newports + context + .createServer(host, newPort, Lists.<TransportServerBootstrap>newArrayList(bootstrap)); LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start) + " Listening on port " + newPort); this.port = newPort; this.host = host; break; } catch (Exception e) { - LOGGER.error(e, "Dictionary Server Failed to bind to port:"); + LOGGER.error(e, "Dictionary Server Failed to bind to port: " + newPort); if (i == 9) { throw new RuntimeException("Dictionary Server Could not bind to any port"); }
