Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 b4b7c0717 -> b1af72403
Fixed style violations in StreamingContainer.java to address APEX-208 Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b0aab1ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b0aab1ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b0aab1ad Branch: refs/heads/devel-3 Commit: b0aab1adf7f00f170c954ea5cd7d731a9fe8dfa3 Parents: af7179c Author: Ilya Ganelin <[email protected]> Authored: Sun Nov 1 09:32:52 2015 -0800 Committer: Ilya Ganelin <[email protected]> Committed: Sun Nov 1 09:32:52 2015 -0800 ---------------------------------------------------------------------- .../stram/engine/StreamingContainer.java | 248 +++++++++---------- 1 file changed, 118 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0aab1ad/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index a17dfdf..1544c16 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -18,7 +18,6 @@ */ package com.datatorrent.stram.engine; -import java.io.File; import java.io.IOException; import java.lang.Thread.State; import java.lang.management.GarbageCollectorMXBean; @@ -28,19 +27,24 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.AbstractMap.SimpleEntry; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.config.BusConfiguration; - -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -52,14 +56,21 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.DTLoggerFactory; import org.apache.log4j.LogManager; -import com.datatorrent.api.*; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Operator.ProcessingMode; +import com.datatorrent.api.StatsListener; import com.datatorrent.api.StatsListener.OperatorRequest; +import com.datatorrent.api.StorageAgent; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.StringCodec; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.bufferserver.server.Server; import com.datatorrent.bufferserver.storage.DiskStorage; import com.datatorrent.bufferserver.util.Codec; @@ -70,19 +81,45 @@ import com.datatorrent.stram.ComponentContextPair; import com.datatorrent.stram.RecoverableRpcProxy; import com.datatorrent.stram.StramUtils.YarnContainerMain; import com.datatorrent.stram.StringCodecs; -import com.datatorrent.stram.api.*; -import com.datatorrent.stram.api.ContainerEvent.*; +import com.datatorrent.stram.api.Checkpoint; +import com.datatorrent.stram.api.ContainerContext; +import com.datatorrent.stram.api.ContainerEvent; +import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent; +import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent; +import com.datatorrent.stram.api.ContainerEvent.NodeDeactivationEvent; +import com.datatorrent.stram.api.ContainerEvent.StreamActivationEvent; +import com.datatorrent.stram.api.ContainerEvent.StreamDeactivationEvent; +import com.datatorrent.stram.api.OperatorDeployInfo; import com.datatorrent.stram.api.OperatorDeployInfo.OperatorType; import com.datatorrent.stram.api.OperatorDeployInfo.UnifierDeployInfo; -import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*; +import com.datatorrent.stram.api.RequestFactory; +import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext; import com.datatorrent.stram.debug.StdOutErrLog; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.Operators.PortContextPair; import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor; import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance; import com.datatorrent.stram.security.StramUserLogin; -import com.datatorrent.stram.stream.*; +import com.datatorrent.stram.stream.BufferServerPublisher; +import com.datatorrent.stram.stream.BufferServerSubscriber; +import com.datatorrent.stram.stream.FastPublisher; +import com.datatorrent.stram.stream.FastSubscriber; +import com.datatorrent.stram.stream.InlineStream; +import com.datatorrent.stram.stream.MuxStream; +import com.datatorrent.stram.stream.OiOStream; +import com.datatorrent.stram.stream.PartitionAwareSink; +import com.datatorrent.stram.stream.PartitionAwareSinkForPersistence; + +import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.config.BusConfiguration; /** * Object which controls the container process launched by {@link com.datatorrent.stram.StreamingAppMaster}. @@ -131,8 +168,7 @@ public class StreamingContainer extends YarnContainerMain static { try { eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop"); - } - catch (IOException io) { + } catch (IOException io) { throw new RuntimeException(io); } } @@ -183,8 +219,7 @@ public class StreamingContainer extends YarnContainerMain if (blocksize < 1) { blocksize = 1; } - } - else { + } else { blocksize = 64; blockCount = bufferServerRAM / blocksize; } @@ -196,10 +231,9 @@ public class StreamingContainer extends YarnContainerMain } SocketAddress bindAddr = bufferServer.run(eventloop); logger.debug("Buffer server started: {}", bindAddr); - this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress) bindAddr)); + this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress)bindAddr)); } - } - catch (IOException ex) { + } catch (IOException ex) { logger.warn("deploy request failed due to {}", ex); throw new IllegalStateException("Failed to deploy buffer server", ex); } @@ -210,15 +244,13 @@ public class StreamingContainer extends YarnContainerMain singletons.put(clazz.getName(), newInstance); if (newInstance instanceof Component) { - components.add((Component<ContainerContext>) newInstance); + components.add((Component<ContainerContext>)newInstance); } eventBus.subscribe(newInstance); - } - catch (InstantiationException ex) { + } catch (InstantiationException ex) { logger.warn("Container Event Listener Instantiation", ex); - } - catch (IllegalAccessException ex) { + } catch (IllegalAccessException ex) { logger.warn("Container Event Listener Instantiation", ex); } } @@ -279,24 +311,20 @@ public class StreamingContainer extends YarnContainerMain /* main thread enters heartbeat loop */ stramChild.heartbeatLoop(); exitStatus = 0; - } - finally { + } finally { stramChild.teardown(); } - } - catch (Error error) { + } catch (Error error) { logger.error("Fatal error in container!", error); /* Report back any failures, for diagnostic purposes */ String msg = ExceptionUtils.getStackTrace(error); umbilical.reportError(childId, null, "FATAL: " + msg); - } - catch (Exception exception) { + } catch (Exception exception) { logger.error("Fatal exception in container!", exception); /* Report back any failures, for diagnostic purposes */ String msg = ExceptionUtils.getStackTrace(exception); umbilical.reportError(childId, null, msg); - } - finally { + } finally { rpcProxy.close(); DefaultMetricsSystem.shutdown(); logger.info("Exit status for container: {}", exitStatus); @@ -317,8 +345,7 @@ public class StreamingContainer extends YarnContainerMain Thread t = e.getValue().context.getThread(); if (t == null || !t.isAlive()) { disconnectNode(e.getKey()); - } - else { + } else { activeThreads.add(t); activeOperators.add(e.getKey()); e.getValue().shutdown(); @@ -334,8 +361,7 @@ public class StreamingContainer extends YarnContainerMain } disconnectNode(iterator.next()); } - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { logger.warn("Aborting wait for operators to get deactivated!", ex); } @@ -370,15 +396,13 @@ public class StreamingContainer extends YarnContainerMain String sinks = pair.context.getSinkId(); if (sinks == null) { logger.error("mux sinks found connected at {} with sink id null", sourceIdentifier); - } - else { + } else { String[] split = sinks.split(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR); for (int i = split.length; i-- > 0; ) { ComponentContextPair<Stream, StreamContext> spair = streams.remove(split[i]); if (spair == null) { logger.error("mux is missing the stream for sink {}", split[i]); - } - else { + } else { if (activeStreams.remove(spair.component) != null) { spair.component.deactivate(); eventBus.publish(new StreamDeactivationEvent(spair)); @@ -388,8 +412,7 @@ public class StreamingContainer extends YarnContainerMain } } } - } - else { + } else { // it's either inline stream or it's bufferserver publisher. } @@ -417,8 +440,7 @@ public class StreamingContainer extends YarnContainerMain if (sourcePair == pair) { /* for some reason we had the stream stored against both source and sink identifiers */ streams.remove(pair.context.getSourceId()); - } - else { + } else { /* the stream was one of the many streams sourced by a muxstream */ unregisterSinkFromMux(sourcePair, sinkIdentifier); } @@ -440,7 +462,7 @@ public class StreamingContainer extends YarnContainerMain } if (found) { - ((Stream.MultiSinkCapableStream) muxpair.component).setSink(sinkIdentifier, null); + ((Stream.MultiSinkCapableStream)muxpair.component).setSink(sinkIdentifier, null); if (sinks.length == 1) { muxpair.context.setSinkId(null); @@ -450,8 +472,7 @@ public class StreamingContainer extends YarnContainerMain eventBus.publish(new StreamDeactivationEvent(muxpair)); } muxpair.component.teardown(); - } - else { + } else { StringBuilder builder = new StringBuilder(muxpair.context.getSinkId().length() - MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR.length() - sinkIdentifier.length()); found = false; @@ -459,8 +480,7 @@ public class StreamingContainer extends YarnContainerMain if (sinks[i] != null) { if (found) { builder.append(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR).append(sinks[i]); - } - else { + } else { builder.append(sinks[i]); found = true; } @@ -469,8 +489,7 @@ public class StreamingContainer extends YarnContainerMain muxpair.context.setSinkId(builder.toString()); } - } - else { + } else { logger.error("{} was not connected to stream connected to {}", sinkIdentifier, muxpair.context.getSourceId()); } @@ -509,11 +528,9 @@ public class StreamingContainer extends YarnContainerMain Node<?> node = nodes.get(operatorId); if (node == null) { throw new IllegalArgumentException("Node " + operatorId + " is not hosted in this container!"); - } - else if (toUndeploy.containsKey(operatorId)) { + } else if (toUndeploy.containsKey(operatorId)) { throw new IllegalArgumentException("Node " + operatorId + " is requested to be undeployed more than once"); - } - else { + } else { toUndeploy.put(operatorId, node); } } @@ -524,8 +541,7 @@ public class StreamingContainer extends YarnContainerMain Thread t = nodes.get(operatorId).context.getThread(); if (t == null || !t.isAlive()) { disconnectNode(operatorId); - } - else { + } else { joinList.add(t); discoList.add(operatorId); nodes.get(operatorId).shutdown(); @@ -542,8 +558,7 @@ public class StreamingContainer extends YarnContainerMain disconnectNode(iterator.next()); } logger.info("Undeploy complete."); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { logger.warn("Aborting wait for operators to get deactivated!", ex); } @@ -608,8 +623,7 @@ public class StreamingContainer extends YarnContainerMain synchronized (this.heartbeatTrigger) { try { this.heartbeatTrigger.wait(heartbeatIntervalMillis); - } - catch (InterruptedException e1) { + } catch (InterruptedException e1) { logger.warn("Interrupted in heartbeat loop, exiting.."); break; } @@ -626,7 +640,7 @@ public class StreamingContainer extends YarnContainerMain msg.restartRequested = true; } } - msg.memoryMBFree = ((int) (Runtime.getRuntime().freeMemory() / (1024 * 1024))); + msg.memoryMBFree = ((int)(Runtime.getRuntime().freeMemory() / (1024 * 1024))); garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans(); for (GarbageCollectorMXBean bean : garbageCollectorMXBeans) { msg.gcCollectionTime += bean.getCollectionTime(); @@ -655,11 +669,9 @@ public class StreamingContainer extends YarnContainerMain if (context.getThread() == null || context.getThread().getState() != Thread.State.TERMINATED) { hb.setState(DeployState.ACTIVE); - } - else if (failedNodes.contains(hb.nodeId)) { + } else if (failedNodes.contains(hb.nodeId)) { hb.setState(DeployState.FAILED); - } - else { + } else { logger.debug("Reporting SHUTDOWN state because thread is {} and failedNodes is {}", context.getThread(), failedNodes); hb.setState(DeployState.SHUTDOWN); } @@ -685,8 +697,7 @@ public class StreamingContainer extends YarnContainerMain synchronized (this.heartbeatTrigger) { try { this.heartbeatTrigger.wait(500); - } - catch (InterruptedException ie) { + } catch (InterruptedException ie) { logger.warn("Interrupted in heartbeat loop", ie); break; } @@ -709,7 +720,7 @@ public class StreamingContainer extends YarnContainerMain continue; } if (req instanceof StramToNodeChangeLoggersRequest) { - handleChangeLoggersRequest((StramToNodeChangeLoggersRequest) req); + handleChangeLoggersRequest((StramToNodeChangeLoggersRequest)req); continue; } @@ -725,14 +736,12 @@ public class StreamingContainer extends YarnContainerMain logger.warn("Received request with invalid operator id {} ({})", req.getOperatorId(), req); req.setDeleted(true); } - } - else { + } else { logger.debug("request received: {}", req); OperatorRequest requestExecutor = requestFactory.getRequestExecutor(nodes.get(req.operatorId), req); if (requestExecutor != null) { node.context.request(requestExecutor); - } - else { + } else { logger.warn("No executor identified for the request {}", req); } req.setDeleted(true); @@ -762,7 +771,7 @@ public class StreamingContainer extends YarnContainerMain @Override public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException { - ((Operator.CheckpointListener) operator).committed(lastCommittedWindowId); + ((Operator.CheckpointListener)operator).committed(lastCommittedWindowId); return null; } @@ -790,13 +799,11 @@ public class StreamingContainer extends YarnContainerMain logger.info("Deploy request: {}", rsp.deployRequest); try { deploy(rsp.deployRequest); - } - catch (Exception e) { + } catch (Exception e) { logger.error("deploy request failed", e); try { umbilical.log(this.containerId, "deploy request failed: " + rsp.deployRequest + " " + ExceptionUtils.getStackTrace(e)); - } - catch (IOException ioe) { + } catch (IOException ioe) { // ignore } this.exitHeartbeatLoop = true; @@ -874,11 +881,10 @@ public class StreamingContainer extends YarnContainerMain Context parentContext; if (ndi instanceof UnifierDeployInfo) { - OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo) ndi).operatorAttributes, containerContext); + OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext); parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext); massageUnifierDeployInfo(ndi); - } - else { + } else { parentContext = containerContext; } @@ -958,8 +964,7 @@ public class StreamingContainer extends YarnContainerMain deployBufferServerPublisher(connIdentifier, streamCodec, checkpointWindowId, queueCapacity, nodi); newStreams.put(sourceIdentifier, deployBufferServerPublisher.getValue()); node.connectOutputPort(nodi.portName, deployBufferServerPublisher.getValue().component); - } - else { + } else { /* * In this case we have 2 possibilities, either we have 1 inline or multiple streams. * Since we cannot tell at this point, we assume that we will have multiple streams and @@ -1001,12 +1006,11 @@ public class StreamingContainer extends YarnContainerMain String sinkIdentifier = pair.context.getSinkId(); if (sinkIdentifier == null) { pair.context.setSinkId(deployBufferServerPublisher.getKey()); - } - else { + } else { pair.context.setSinkId(sinkIdentifier.concat(", ").concat(deployBufferServerPublisher.getKey())); } - ((Stream.MultiSinkCapableStream) pair.component).setSink(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue().component); + ((Stream.MultiSinkCapableStream)pair.component).setSink(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue().component); } } } @@ -1069,8 +1073,7 @@ public class StreamingContainer extends YarnContainerMain if (ndi.checkpoint.windowId < smallestCheckpointedWindowId) { smallestCheckpointedWindowId = ndi.checkpoint.windowId; } - } - else { + } else { Node<?> node = nodes.get(ndi.id); for (OperatorDeployInfo.InputDeployInfo nidi : ndi.inputs) { @@ -1120,7 +1123,7 @@ public class StreamingContainer extends YarnContainerMain BufferServerSubscriber subscriber = fastPublisherSubscriber ? new FastSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity) : new BufferServerSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity); - if(streamCodec instanceof StreamCodecWrapperForPersistance) { + if (streamCodec instanceof StreamCodecWrapperForPersistance) { subscriber.acquireReservoirForPersistStream(sinkIdentifier, queueCapacity, streamCodec); } SweepableReservoir reservoir = subscriber.acquireReservoir(sinkIdentifier, queueCapacity); @@ -1131,8 +1134,7 @@ public class StreamingContainer extends YarnContainerMain newStreams.put(sinkIdentifier, new ComponentContextPair<Stream, StreamContext>(subscriber, context)); logger.debug("put input stream {} against key {}", subscriber, sinkIdentifier); - } - else { + } else { assert (nidi.locality == Locality.CONTAINER_LOCAL || nidi.locality == Locality.THREAD_LOCAL); /* we are still dealing with the MuxStream originating at the output of the source port */ StreamContext inlineContext = new StreamContext(nidi.declaredStreamId); @@ -1149,7 +1151,7 @@ public class StreamingContainer extends YarnContainerMain stream = new InlineStream(queueCapacity); if (checkpoint.windowId >= 0) { - node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, (SweepableReservoir) stream, checkpoint.windowId)); + node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, (SweepableReservoir)stream, checkpoint.windowId)); } break; @@ -1162,7 +1164,7 @@ public class StreamingContainer extends YarnContainerMain throw new IllegalStateException("Locality can be either ContainerLocal or ThreadLocal"); } - node.connectInputPort(nidi.portName, (SweepableReservoir) stream); + node.connectInputPort(nidi.portName, (SweepableReservoir)stream); newStreams.put(sinkIdentifier, new ComponentContextPair<Stream, StreamContext>(stream, inlineContext)); if (!(pair.component instanceof Stream.MultiSinkCapableStream)) { @@ -1187,27 +1189,26 @@ public class StreamingContainer extends YarnContainerMain if (streamCodec instanceof StreamCodecWrapperForPersistance) { PartitionAwareSinkForPersistence pas; if (nidi.partitionKeys == null) { - pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>) streamCodec, nidi.partitionMask, stream); + pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)streamCodec, nidi.partitionMask, stream); } else { - pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>) streamCodec, nidi.partitionKeys, nidi.partitionMask, stream); + pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)streamCodec, nidi.partitionKeys, nidi.partitionMask, stream); } - ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, pas); + ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas); } else if (nidi.partitionKeys == null || nidi.partitionKeys.isEmpty()) { - ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, stream); + ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, stream); } else { /* * generally speaking we do not have partitions on the inline streams so the control should not * come here but if it comes, then we are ready to handle it using the partition aware streams. */ - PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>(streamCodec == null ? nonSerializingStreamCodec : (StreamCodec<Object>) streamCodec, nidi.partitionKeys, nidi.partitionMask, stream); - ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, pas); + PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>(streamCodec == null ? nonSerializingStreamCodec : (StreamCodec<Object>)streamCodec, nidi.partitionKeys, nidi.partitionMask, stream); + ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas); } String streamSinkId = pair.context.getSinkId(); if (streamSinkId == null) { pair.context.setSinkId(sinkIdentifier); - } - else { + } else { pair.context.setSinkId(streamSinkId.concat(", ").concat(sinkIdentifier)); } } @@ -1275,7 +1276,7 @@ public class StreamingContainer extends YarnContainerMain windowGenerator.setWindowWidth(windowWidthMillis); long windowCount = WindowGenerator.getWindowCount(millisAtFirstWindow, firstWindowMillis, windowWidthMillis); - windowGenerator.setCheckpointCount(checkpointWindowCount, (int) (windowCount % checkpointWindowCount)); + windowGenerator.setCheckpointCount(checkpointWindowCount, (int)(windowCount % checkpointWindowCount)); return windowGenerator; } @@ -1320,8 +1321,7 @@ public class StreamingContainer extends YarnContainerMain final Node<?> node = nodes.get(ndi.id); if (node == null) { logger.warn("node {}/{} took longer to exit, resulting in unclean undeploy!", ndi.id, ndi.name); - } - else { + } else { eventBus.publish(new NodeDeactivationEvent(node)); node.deactivate(); node.teardown(); @@ -1378,8 +1378,7 @@ public class StreamingContainer extends YarnContainerMain } node.run(); /* this is a blocking call */ - } - catch (Error error) { + } catch (Error error) { int[] operators; if (currentdi == null) { logger.error("Voluntary container termination due to an error in operator set {}.", setOperators, error); @@ -1388,39 +1387,33 @@ public class StreamingContainer extends YarnContainerMain for (Iterator<OperatorDeployInfo> it = setOperators.iterator(); it.hasNext(); i++) { operators[i] = it.next().id; } - } - else { + } else { logger.error("Voluntary container termination due to an error in operator {}.", currentdi, error); operators = new int[]{currentdi.id}; } umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error)); System.exit(1); - } - catch (Exception ex) { + } catch (Exception ex) { if (currentdi == null) { failedNodes.add(ndi.id); logger.error("Operator set {} stopped running due to an exception.", setOperators, ex); int[] operators = new int[]{ndi.id}; umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex)); - } - else { + } else { failedNodes.add(currentdi.id); logger.error("Abandoning deployment of operator {} due to setup failure.", currentdi, ex); int[] operators = new int[]{currentdi.id}; umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex)); } - } - finally { + } finally { if (setOperators.contains(ndi)) { try { teardownNode(ndi); - } - catch (Exception ex) { + } catch (Exception ex) { failedNodes.add(ndi.id); logger.error("Shutdown of operator {} failed due to an exception.", ndi, ex); } - } - else { + } else { signal.countDown(); } @@ -1431,13 +1424,11 @@ public class StreamingContainer extends YarnContainerMain if (setOperators.contains(oiodi)) { try { teardownNode(oiodi); - } - catch (Exception ex) { + } catch (Exception ex) { failedNodes.add(oiodi.id); logger.error("Shutdown of operator {} failed due to an exception.", oiodi, ex); } - } - else { + } else { signal.countDown(); } } @@ -1453,8 +1444,7 @@ public class StreamingContainer extends YarnContainerMain */ try { signal.await(); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { logger.debug("Activation of operators interruped.", ex); } @@ -1509,17 +1499,16 @@ public class StreamingContainer extends YarnContainerMain if (temp == null) { temp = containerContext.getValue(OperatorContext.APPLICATION_WINDOW_COUNT); } - int appWindowCount = (int) (windowCount % temp); + int appWindowCount = (int)(windowCount % temp); temp = ndi.contextAttributes.get(OperatorContext.CHECKPOINT_WINDOW_COUNT); if (temp == null) { temp = containerContext.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT); } - int lCheckpointWindowCount = (int) (windowCount % temp); + int lCheckpointWindowCount = (int)(windowCount % temp); checkpoint = new Checkpoint(WindowGenerator.getWindowId(now, firstWindowMillis, windowWidthMillis), appWindowCount, lCheckpointWindowCount); logger.debug("using {} on {} at {}", ProcessingMode.AT_MOST_ONCE, ndi.name, checkpoint); - } - else { + } else { checkpoint = ndi.checkpoint; logger.debug("using {} on {} at {}", ndi.contextAttributes == null ? ProcessingMode.AT_LEAST_ONCE : (ndi.contextAttributes.get(OperatorContext.PROCESSING_MODE) == null ? ProcessingMode.AT_LEAST_ONCE : @@ -1535,8 +1524,7 @@ public class StreamingContainer extends YarnContainerMain for (Component<ContainerContext> c : components) { c.setup(ctx); } - } - else { + } else { for (Component<ContainerContext> c : components) { c.teardown(); }
