Repository: samza Updated Branches: refs/heads/master a460a82e3 -> 023a7ce23
SAMZA-948: Samza CoordinatorStreamSystemConsumer is not thread-safe Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/023a7ce2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/023a7ce2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/023a7ce2 Branch: refs/heads/master Commit: 023a7ce233631c14c40ee5241e13a298362d07b2 Parents: a460a82 Author: Jacob Maes <[email protected]> Authored: Thu May 12 11:07:49 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu May 12 11:07:49 2016 -0700 ---------------------------------------------------------------------- .../stream/CoordinatorStreamSystemConsumer.java | 75 +++++++++++--------- .../samza/coordinator/JobCoordinator.scala | 2 +- .../TestCoordinatorStreamSystemConsumer.java | 23 ------ 3 files changed, 43 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index 8e1057b..0a6661c 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -19,13 +19,13 @@ package org.apache.samza.coordinator.stream; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -58,15 +58,16 @@ public class CoordinatorStreamSystemConsumer { private final SystemConsumer systemConsumer; private final SystemAdmin systemAdmin; private final Map<String, String> configMap; - private boolean isBootstrapped; - private boolean isStarted; - private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new LinkedHashSet<CoordinatorStreamMessage>(); + private volatile boolean isStarted; + private volatile boolean isBootstrapped; + private final Object bootstrapLock = new Object(); + private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet(); public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) { this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0)); this.systemConsumer = systemConsumer; this.systemAdmin = systemAdmin; - this.configMap = new HashMap<String, String>(); + this.configMap = new HashMap(); this.isBootstrapped = false; this.keySerde = keySerde; this.messageSerde = messageSerde; @@ -139,38 +140,46 @@ public class CoordinatorStreamSystemConsumer { * Currently, this method only pays attention to config messages. */ public void bootstrap() { - log.info("Bootstrapping configuration from coordinator stream."); - SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition); + synchronized (bootstrapLock) { + // Make a copy so readers aren't affected while we modify the set. + final LinkedHashSet<CoordinatorStreamMessage> bootstrappedMessages = new LinkedHashSet<>(bootstrappedStreamSet); - try { - while (iterator.hasNext()) { - IncomingMessageEnvelope envelope = iterator.next(); - Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray(); - Map<String, Object> valueMap = null; - if (envelope.getMessage() != null) { - valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage()); - } - CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); - log.debug("Received coordinator stream message: {}", coordinatorStreamMessage); - // Remove any existing entry. Set.add() does not add if the element already exists. - if (bootstrappedStreamSet.remove(coordinatorStreamMessage)) { - log.debug("Removed duplicate message: {}", coordinatorStreamMessage); - } - bootstrappedStreamSet.add(coordinatorStreamMessage); - if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) { - String configKey = coordinatorStreamMessage.getKey(); - if (coordinatorStreamMessage.isDelete()) { - configMap.remove(configKey); - } else { - String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue(); - configMap.put(configKey, configValue); + log.info("Bootstrapping configuration from coordinator stream."); + SystemStreamPartitionIterator iterator = + new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition); + + try { + while (iterator.hasNext()) { + IncomingMessageEnvelope envelope = iterator.next(); + Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray(); + Map<String, Object> valueMap = null; + if (envelope.getMessage() != null) { + valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage()); + } + CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); + log.debug("Received coordinator stream message: {}", coordinatorStreamMessage); + // Remove any existing entry. Set.add() does not add if the element already exists. + if (bootstrappedMessages.remove(coordinatorStreamMessage)) { + log.debug("Removed duplicate message: {}", coordinatorStreamMessage); + } + bootstrappedMessages.add(coordinatorStreamMessage); + if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) { + String configKey = coordinatorStreamMessage.getKey(); + if (coordinatorStreamMessage.isDelete()) { + configMap.remove(configKey); + } else { + String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue(); + configMap.put(configKey, configValue); + } } } + + bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages); + log.debug("Bootstrapped configuration: {}", configMap); + isBootstrapped = true; + } catch (Exception e) { + throw new SamzaException(e); } - log.debug("Bootstrapped configuration: {}", configMap); - isBootstrapped = true; - } catch (Exception e) { - throw new SamzaException(e); } } http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 03f48db..bd7f3f5 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -165,8 +165,8 @@ object JobCoordinator extends Logging { // Do grouping to fetch TaskName to SSP mapping val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache) val grouper = getSystemStreamPartitionGrouper(config) - info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:") val groups = grouper.group(allSystemStreamPartitions) + info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet())) // Initialize the ChangelogPartitionManager and the CheckpointManager val previousChangelogMapping = if (changelogManager != null) http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java index 9499027..03dcbb1 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java @@ -22,7 +22,6 @@ package org.apache.samza.coordinator.stream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -70,7 +69,6 @@ public class TestCoordinatorStreamSystemConsumer { // Expected. } consumer.bootstrap(); - assertTrue(testOrder(consumer.getBoostrappedStream())); assertEquals(expectedConfig, consumer.getConfig()); assertFalse(systemConsumer.isStopped()); consumer.stop(); @@ -94,27 +92,6 @@ public class TestCoordinatorStreamSystemConsumer { assertEquals(1, systemConsumer.getRegisterCount()); } - private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) { - int initialSize = bootstrappedStreamSet.size(); - List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>(); - listStreamMessages.add(new SetConfig("order1", "job.name.order1", "my-order1-name")); - listStreamMessages.add(new SetConfig("order2", "job.name.order2", "my-order2-name")); - listStreamMessages.add(new SetConfig("order3", "job.name.order3", "my-order3-name")); - bootstrappedStreamSet.addAll(listStreamMessages); - Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator(); - - for (int i = 0; i < initialSize; ++i) { - iter.next(); - } - int i = 0; - while (iter.hasNext()) { - if (!iter.next().getKey().equals(listStreamMessages.get(i++).getKey())) { - return false; - } - } - return true; - } - /** * Verify that if a particular key-value is written, then another, then the original again, * that the original occurs last in the set.
