SAMZA-816: avoid starting coordinator consumer in LocalityManager in SamzaContainer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/429f2458 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/429f2458 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/429f2458 Branch: refs/heads/samza-sql Commit: 429f245839bd359c1375302fa488d8c96ca83a45 Parents: e8a2ef5 Author: Yi Pan <[email protected]> Authored: Fri Nov 20 01:19:28 2015 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Nov 20 01:19:28 2015 -0800 ---------------------------------------------------------------------- .../apache/samza/container/LocalityManager.java | 43 +++++- .../AbstractCoordinatorStreamManager.java | 12 +- .../apache/samza/container/SamzaContainer.scala | 3 +- .../samza/container/TestLocalityManager.java | 140 +++++++++++++++++++ .../MockCoordinatorStreamSystemFactory.java | 107 ++++++++++++++ 5 files changed, 299 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index d19b574..86c9e9b 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -37,11 +37,29 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; public class LocalityManager extends AbstractCoordinatorStreamManager { private static final Logger log = LoggerFactory.getLogger(LocalityManager.class); private Map<Integer, Map<String, String>> containerToHostMapping; + private final boolean writeOnly; + /** + * Default constructor that creates a read-write manager + * + * @param coordinatorStreamProducer producer to the coordinator stream + * @param coordinatorStreamConsumer consumer for the coordinator stream + */ public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, CoordinatorStreamSystemConsumer coordinatorStreamConsumer) { super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaContainer-"); this.containerToHostMapping = new HashMap<>(); + this.writeOnly = coordinatorStreamConsumer == null; + } + + /** + * Special constructor that creates a write-only {@link LocalityManager} that only writes + * to coordinator stream in {@link SamzaContainer} + * + * @param coordinatorStreamSystemProducer producer to the coordinator stream + */ + public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) { + this(coordinatorStreamSystemProducer, null); } /** @@ -59,11 +77,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager { * @param sourceSuffix the source suffix which is a container id */ public void register(String sourceSuffix) { - registerCoordinatorStreamConsumer(); + if (!this.writeOnly) { + registerCoordinatorStreamConsumer(); + } registerCoordinatorStreamProducer(getSource() + sourceSuffix); } + /** + * Method to allow read container locality information from coordinator stream. This method is used + * in {@link org.apache.samza.coordinator.JobCoordinator}. + * + * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress) + */ public Map<Integer, Map<String, String>> readContainerLocality() { + if (this.writeOnly) { + throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager"); + } + Map<Integer, Map<String, String>> allMappings = new HashMap<>(); for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) { SetContainerHostMapping mapping = new SetContainerHostMapping(message); @@ -78,6 +108,14 @@ public class LocalityManager extends AbstractCoordinatorStreamManager { return allMappings; } + /** + * Method to write locality info to coordinator stream. This method is used in {@link SamzaContainer}. + * + * @param containerId the {@link SamzaContainer} ID + * @param hostName the hostname + * @param jmxAddress the JMX URL address + * @param jmxTunnelingAddress the JMX Tunnel URL address + */ public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) { Map<String, String> existingMappings = containerToHostMapping.get(containerId); String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null; @@ -86,7 +124,8 @@ public class LocalityManager extends AbstractCoordinatorStreamManager { } else { log.info("Container {} started at {}", containerId, hostName); } - send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress, jmxTunnelingAddress)); + send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress, + jmxTunnelingAddress)); Map<String, String> mappings = new HashMap<>(); mappings.put(SetContainerHostMapping.HOST_KEY, hostName); mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress); http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java index ca97ce8..211b642 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java @@ -49,14 +49,18 @@ public abstract class AbstractCoordinatorStreamManager { */ public void start() { coordinatorStreamProducer.start(); - coordinatorStreamConsumer.start(); + if (coordinatorStreamConsumer != null) { + coordinatorStreamConsumer.start(); + } } /** * Stops the underlying coordinator stream producer and consumer. */ public void stop() { - coordinatorStreamConsumer.stop(); + if (coordinatorStreamConsumer != null) { + coordinatorStreamConsumer.stop(); + } coordinatorStreamProducer.stop(); } @@ -74,6 +78,10 @@ public abstract class AbstractCoordinatorStreamManager { * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set */ public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) { + if (coordinatorStreamConsumer == null) { + throw new UnsupportedOperationException(String.format("CoordinatorStreamConsumer is not initialized in the AbstractCoordinatorStreamManager. " + + "manager registered source: %s, input source: %s", this.source, source)); + } return coordinatorStreamConsumer.getBootstrappedStream(source); } http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index db6074b..ddce148 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -304,9 +304,8 @@ object SamzaContainer extends Logging { info("Got metrics reporters: %s" format reporters.keys) - val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry) val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry) - val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer) + val localityManager = new LocalityManager(coordinatorSystemProducer) val checkpointManager = config.getCheckpointManagerFactory() match { case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) => Util http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java new file mode 100644 index 0000000..9661885 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; +import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer; +import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer; +import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + + +/** + * Unit tests for {@link LocalityManager} + */ +public class TestLocalityManager { + + private final MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory = + new MockCoordinatorStreamSystemFactory(); + private final Config config = new MapConfig( + new HashMap<String, String>() { + { + this.put("job.name", "test-job"); + this.put("job.coordinator.system", "test-kafka"); + } + }); + + @Before + public void setup() { + MockCoordinatorStreamSystemFactory.enableMockConsumerCache(); + } + + @After + public void tearDown() { + MockCoordinatorStreamSystemFactory.disableMockConsumerCache(); + } + + @Test public void testLocalityManager() throws Exception { + MockCoordinatorStreamSystemProducer producer = + mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null); + MockCoordinatorStreamSystemConsumer consumer = + mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null); + LocalityManager localityManager = new LocalityManager(producer, consumer); + + try { + localityManager.register(new TaskName("task-0")); + fail("Should have thrown UnsupportedOperationException"); + } catch (UnsupportedOperationException uoe) { + // expected + } + + localityManager.register("containerId-0"); + assertTrue(producer.isRegistered()); + assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-0"); + assertTrue(consumer.isRegistered()); + + localityManager.start(); + assertTrue(producer.isStarted()); + assertTrue(consumer.isStarted()); + + localityManager.writeContainerToHostMapping(0, "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090"); + Map<Integer, Map<String, String>> localMap = localityManager.readContainerLocality(); + Map<Integer, Map<String, String>> expectedMap = + new HashMap<Integer, Map<String, String>>() { + { + this.put(new Integer(0), + new HashMap<String, String>() { + { + this.put(SetContainerHostMapping.HOST_KEY, "localhost"); + this.put(SetContainerHostMapping.JMX_URL_KEY, "jmx:localhost:8080"); + this.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, "jmx:tunnel:localhost:9090"); + } + }); + } + }; + assertEquals(expectedMap, localMap); + + localityManager.stop(); + assertTrue(producer.isStopped()); + assertTrue(consumer.isStopped()); + } + + @Test public void testWriteOnlyLocalityManager() { + MockCoordinatorStreamSystemProducer producer = + mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null); + LocalityManager localityManager = new LocalityManager(producer); + + localityManager.register("containerId-1"); + assertTrue(producer.isRegistered()); + assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-1"); + + localityManager.start(); + assertTrue(producer.isStarted()); + + localityManager.writeContainerToHostMapping(1, "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191"); + try { + localityManager.readContainerLocality(); + fail("Should have thrown UnsupportedOperationException"); + } catch (UnsupportedOperationException uoe) { + // expected + } + assertEquals(producer.getEnvelopes().size(), 1); + CoordinatorStreamMessage coordinatorStreamMessage = + MockCoordinatorStreamSystemFactory.deserializeCoordinatorStreamMessage(producer.getEnvelopes().get(0)); + + SetContainerHostMapping expectedContainerMap = + new SetContainerHostMapping("SamzaContainer-1", "1", "localhost", "jmx:localhost:8181", + "jmx:tunnel:localhost:9191"); + assertEquals(expectedContainerMap, coordinatorStreamMessage); + + localityManager.stop(); + assertTrue(producer.isStopped()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java index 9d8c98e..e0d4aa1 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java @@ -22,10 +22,13 @@ package org.apache.samza.coordinator.stream; import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; +import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.serializers.JsonSerde; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -36,6 +39,9 @@ import org.apache.samza.util.Util; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertNotNull; /** @@ -59,6 +65,14 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { mockConsumer = null; } + public static CoordinatorStreamMessage deserializeCoordinatorStreamMessage(OutgoingMessageEnvelope msg) { + JsonSerde<List<?>> keySerde = new JsonSerde<>(); + Object[] keyArray = keySerde.fromBytes((byte[]) msg.getKey()).toArray(); + JsonSerde<Map<String, Object>> msgSerde = new JsonSerde<>(); + Map<String, Object> valueMap = msgSerde.fromBytes((byte[]) msg.getMessage()); + return new CoordinatorStreamMessage(keyArray, valueMap); + } + /** * Returns a consumer that sends all configs to the coordinator stream. * @@ -87,6 +101,13 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { return mockConsumer; } + private SystemStream getCoordinatorSystemStream(Config config) { + assertNotNull(config.get("job.coordinator.system")); + assertNotNull(config.get("job.name")); + return new SystemStream(config.get("job.coordinator.system"), Util.getCoordinatorStreamName(config.get("job.name"), + config.get("job.id") == null ? "1" : config.get("job.id"))); + } + /** * Returns a MockCoordinatorSystemProducer. */ @@ -94,6 +115,18 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { return new MockSystemProducer(null); } + public MockCoordinatorStreamSystemConsumer getCoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) { + return new MockCoordinatorStreamSystemConsumer(getCoordinatorSystemStream(config), + getConsumer(config.get("job.coordinator.system"), config, registry), + getAdmin(config.get("job.coordinator.system"), config)); + } + + public MockCoordinatorStreamSystemProducer getCoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) { + return new MockCoordinatorStreamSystemProducer(getCoordinatorSystemStream(config), + getProducer(config.get("job.coordinator.system"), config, registry), + getAdmin(config.get("job.coordinator.system"), config)); + } + /** * Returns a single partition admin that pretends to create a coordinator * stream. @@ -102,6 +135,74 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { return new MockSystemAdmin(); } + public static final class MockCoordinatorStreamSystemConsumer extends CoordinatorStreamSystemConsumer { + private final MockCoordinatorStreamWrappedConsumer consumer; + private boolean isRegistered = false; + private boolean isStarted = false; + + public MockCoordinatorStreamSystemConsumer(SystemStream stream, SystemConsumer consumer, SystemAdmin admin) { + super(stream, consumer, admin); + this.consumer = (MockCoordinatorStreamWrappedConsumer) consumer; + } + + public MockCoordinatorStreamWrappedConsumer getConsumer() { + return this.consumer; + } + + public void register() { + isRegistered = true; + } + + public void start() { + isStarted = true; + } + + public void stop() { + isStarted = false; + } + + public boolean isRegistered() { + return isRegistered; + } + + public boolean isStarted() { + return isStarted; + } + + public boolean isStopped() { + return !isStarted; + } + } + + public static final class MockCoordinatorStreamSystemProducer extends CoordinatorStreamSystemProducer { + private final MockSystemProducer producer; + + public MockCoordinatorStreamSystemProducer(SystemStream stream, SystemProducer producer, SystemAdmin admin) { + super(stream, producer, admin); + this.producer = (MockSystemProducer) producer; + } + + public boolean isRegistered() { + return this.producer.isRegistered(); + } + + public String getRegisteredSource() { + return this.producer.getRegisteredSource(); + } + + public boolean isStarted() { + return this.producer.isStarted(); + } + + public boolean isStopped() { + return this.producer.isStopped(); + } + + public List<OutgoingMessageEnvelope> getEnvelopes() { + return this.producer.getEnvelopes(); + } + } + public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { public void createCoordinatorStream(String streamName) { // Do nothing. @@ -113,6 +214,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { private final List<OutgoingMessageEnvelope> envelopes; private boolean started = false; private boolean registered = false; + private String registeredSource = null; private boolean flushed = false; public MockSystemProducer(String expectedSource) { @@ -131,6 +233,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { public void register(String source) { registered = true; + registeredSource = source; } public void send(String source, OutgoingMessageEnvelope envelope) { @@ -175,5 +278,9 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { public String getExpectedSource() { return expectedSource; } + + public String getRegisteredSource() { + return registeredSource; + } } }
