Repository: apex-core Updated Branches: refs/heads/master 65a721fb7 -> ae0ec2464
APEXCORE-516 - StramLocalCluster should always use loopback address for buffer server location Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/6fa7b91e Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/6fa7b91e Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/6fa7b91e Branch: refs/heads/master Commit: 6fa7b91ee2b1fdfc79ef36c9345485089583f371 Parents: 9c48c41 Author: Vlad Rozov <[email protected]> Authored: Mon Aug 29 10:56:00 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Aug 29 10:56:00 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/stram/StramLocalCluster.java | 23 +++++++++++--------- .../stram/engine/StreamingContainer.java | 10 ++++----- 2 files changed, 17 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fa7b91e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index b3cf63f..23737d0 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.Collections; import java.util.List; import java.util.Map; @@ -37,7 +36,6 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.net.NetUtils; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode.Controller; @@ -69,6 +67,8 @@ public class StramLocalCluster implements Runnable, Controller private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class); // assumes execution as unit test private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName()); + private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname"; + private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost"); protected final StreamingContainerManager dnmgr; private final UmbilicalProtocolLocalImpl umbilical; private InetSocketAddress bufferServerAddress; @@ -169,19 +169,23 @@ public class StramLocalCluster implements Runnable, Controller this.windowGenerator = winGen; } - public static void run(StreamingContainer stramChild, StreamingContainerContext ctx) throws Exception + public void run(StreamingContainerContext ctx) throws Exception { LOG.debug("Got context: " + ctx); - stramChild.setup(ctx); + setup(ctx); + if (bufferServerAddress != null && !bufferServerAddress.getAddress().isLoopbackAddress()) { + bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServerAddress.getPort()); + } + boolean hasError = true; try { // main thread enters heartbeat loop - stramChild.heartbeatLoop(); + heartbeatLoop(); hasError = false; } finally { // teardown try { - stramChild.teardown(); + teardown(); } catch (Exception e) { if (!hasError) { throw e; @@ -247,7 +251,7 @@ public class StramLocalCluster implements Runnable, Controller } this.child = new LocalStreamingContainer(containerId, umbilical, wingen); ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null); - StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : NetUtils.getConnectAddress(bufferServerAddress)); + StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress); if (sca != null) { Thread launchThread = new Thread(this, containerId); launchThread.start(); @@ -261,7 +265,7 @@ public class StramLocalCluster implements Runnable, Controller { try { StreamingContainerContext ctx = umbilical.getInitContext(containerId); - LocalStreamingContainer.run(child, ctx); + child.run(ctx); } catch (Exception e) { LOG.error("Container {} failed", containerId, e); throw new RuntimeException(e); @@ -303,8 +307,7 @@ public class StramLocalCluster implements Runnable, Controller StreamingContainer.eventloop.start(); bufferServer = new Server(0, 1024 * 1024,8); bufferServer.setSpoolStorage(new DiskStorage()); - SocketAddress bindAddr = bufferServer.run(StreamingContainer.eventloop); - this.bufferServerAddress = ((InetSocketAddress)bindAddr); + bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort()); LOG.info("Buffer server started: {}", bufferServerAddress); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fa7b91e/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 756fec2..0c74b27 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -24,7 +24,6 @@ import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; @@ -155,8 +154,8 @@ public class StreamingContainer extends YarnContainerMain */ private long firstWindowMillis; private int windowWidthMillis; - private InetSocketAddress bufferServerAddress; - private com.datatorrent.bufferserver.server.Server bufferServer; + protected InetSocketAddress bufferServerAddress; + protected com.datatorrent.bufferserver.server.Server bufferServer; private int checkpointWindowCount; private boolean fastPublisherSubscriber; private StreamingContainerContext containerContext; @@ -230,9 +229,8 @@ public class StreamingContainer extends YarnContainerMain if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) { bufferServer.setSpoolStorage(new DiskStorage()); } - SocketAddress bindAddr = bufferServer.run(eventloop); - logger.debug("Buffer server started: {}", bindAddr); - this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress)bindAddr)); + bufferServerAddress = NetUtils.getConnectAddress(bufferServer.run(eventloop)); + logger.debug("Buffer server started: {}", bufferServerAddress); } } catch (IOException ex) { logger.warn("deploy request failed due to {}", ex);
