This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ccae8f9 SAMZA-2152: Bugfix: Making KafkaSytemAdmin's metadataConsumer
accesses thread-safe, enabling StreamRegexMonitors only when required
ccae8f9 is described below
commit ccae8f992007cec2f960bd1608b0c61539ec9a42
Author: Ray Matharu <[email protected]>
AuthorDate: Wed Apr 3 11:56:24 2019 -0700
SAMZA-2152: Bugfix: Making KafkaSytemAdmin's metadataConsumer accesses
thread-safe, enabling StreamRegexMonitors only when required
StreamPartitionCountMOnitor and StreamRegexMonitor use the
KafkaSystemAdmin, which has a kafka-consumer.
Kafka consumer is not thread-safe, therefore explicit synchronization is
required for metadata accesses.
StreamRegexMonitor is not to be enabled when there is no regex input.
Author: Ray Matharu <[email protected]>
Reviewers: Prateek Maheshwari <[email protected]>
Closes #981 from rmatharu/bugfix-regexmonitor
---
.../clustermanager/ClusterBasedJobCoordinator.java | 16 ++++--
.../samza/system/kafka/KafkaSystemAdmin.java | 64 +++++++++++++---------
2 files changed, 50 insertions(+), 30 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0eddbf2..490db24 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -340,7 +340,7 @@ public class ClusterBasedJobCoordinator {
// if no rewriter is defined, there is nothing to monitor
if (!rewritersList.isDefined()) {
- log.warn("No config rewriters are defined. No StreamRegexMonitor
created.");
+ log.info("No config rewriters are defined. No StreamRegexMonitor
created.");
return Optional.empty();
}
@@ -349,14 +349,22 @@ public class ClusterBasedJobCoordinator {
JavaConverters.mapAsJavaMapConverter(new
JobConfig(config).getMonitorRegexPatternMap(rewritersList.get()))
.asJava();
+ // if there are no regexes to monitor
+ if (inputRegexesToMonitor.isEmpty()) {
+ log.info("No input regexes are defined. No StreamRegexMonitor created.");
+ return Optional.empty();
+ }
+
return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor,
inputRegexesToMonitor, streamMetadata, metrics,
new JobConfig(config).getMonitorRegexFrequency(), new
StreamRegexMonitor.Callback() {
@Override
public void onInputStreamsChanged(Set<SystemStream> initialInputSet,
Set<SystemStream> newInputStreams,
Map<String, Pattern> regexesMonitored) {
- log.error("New input system-streams discovered. Failing the job.
New input streams: {}", newInputStreams,
- " Existing input streams:", inputStreamsToMonitor);
- state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ if (hasDurableStores) {
+ log.error("New input system-streams discovered. Failing the job.
New input streams: {}", newInputStreams,
+ " Existing input streams:", inputStreamsToMonitor);
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ }
coordinatorException = new InputStreamsDiscoveredException("New
input streams added: " + newInputStreams);
}
}));
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index a208835..a82a6b9 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -90,7 +90,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
public static volatile boolean deleteMessageCalled = false;
protected final String systemName;
- protected final Consumer metadataConsumer;
+ private final Consumer metadataConsumer; // Need to synchronize all
accesses, since KafkaConsumer is not thread-safe
protected final Config config;
// Custom properties to create a new coordinator stream.
@@ -231,11 +231,13 @@ public class KafkaSystemAdmin implements SystemAdmin {
streamNames.forEach(streamName -> {
Map<Partition,
SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new
HashMap<>();
- List<PartitionInfo> partitionInfos =
metadataConsumer.partitionsFor(streamName);
+ List<PartitionInfo> partitionInfos;
+ synchronized (metadataConsumer) {
+ partitionInfos = metadataConsumer.partitionsFor(streamName);
+ }
LOG.debug("Stream {} has partitions {}", streamName,
partitionInfos);
-
- partitionInfos.forEach(partitionInfo ->
partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
-
+ partitionInfos.forEach(
+ partitionInfo -> partitionMetadata.put(new
Partition(partitionInfo.partition()), dummySspm));
allMetadata.put(streamName, new SystemStreamMetadata(streamName,
partitionMetadata));
});
@@ -381,11 +383,15 @@ public class KafkaSystemAdmin implements SystemAdmin {
Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> newestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>();
-
- Map<TopicPartition, Long> oldestOffsetsWithLong =
metadataConsumer.beginningOffsets(topicPartitions);
- LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
- Map<TopicPartition, Long> upcomingOffsetsWithLong =
metadataConsumer.endOffsets(topicPartitions);
- LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
+ Map<TopicPartition, Long> oldestOffsetsWithLong;
+ Map<TopicPartition, Long> upcomingOffsetsWithLong;
+
+ synchronized (metadataConsumer) {
+ oldestOffsetsWithLong =
metadataConsumer.beginningOffsets(topicPartitions);
+ LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
+ upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
+ LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
+ }
oldestOffsetsWithLong.forEach((topicPartition, offset) ->
oldestOffsets.put(toSystemStreamPartition(topicPartition),
String.valueOf(offset)));
@@ -424,21 +430,23 @@ public class KafkaSystemAdmin implements SystemAdmin {
LOG.info("Fetching SystemStreamMetadata for topics {} on system {}",
topics, systemName);
topics.forEach(topic -> {
- List<PartitionInfo> partitionInfos =
metadataConsumer.partitionsFor(topic);
+ synchronized (metadataConsumer) {
+ List<PartitionInfo> partitionInfos =
metadataConsumer.partitionsFor(topic);
- if (partitionInfos == null) {
- String msg = String.format("Partition info not(yet?) available for
system %s topic %s", systemName, topic);
- throw new SamzaException(msg);
- }
+ if (partitionInfos == null) {
+ String msg = String.format("Partition info not(yet?) available for
system %s topic %s", systemName, topic);
+ throw new SamzaException(msg);
+ }
- List<TopicPartition> topicPartitions = partitionInfos.stream()
- .map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
- .collect(Collectors.toList());
+ List<TopicPartition> topicPartitions = partitionInfos.stream()
+ .map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
+ .collect(Collectors.toList());
- OffsetsMaps offsetsForTopic =
fetchTopicPartitionsMetadata(topicPartitions);
- allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
- allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
- allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
+ OffsetsMaps offsetsForTopic =
fetchTopicPartitionsMetadata(topicPartitions);
+ allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
+ allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
+ allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
+ }
});
return assembleMetadata(allOldestOffsets, allNewestOffsets,
allUpcomingOffsets);
@@ -572,7 +580,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>();
List<PartitionInfo> partitionInfoList;
for (String topic : topics) {
- partitionInfoList = metadataConsumer.partitionsFor(topic);
+ synchronized (metadataConsumer) {
+ partitionInfoList = metadataConsumer.partitionsFor(topic);
+ }
streamToPartitionsInfo.put(topic, partitionInfoList);
}
@@ -629,9 +639,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
@Override
public Set<SystemStream> getAllSystemStreams() {
- return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
- .map(x -> new SystemStream(systemName, x))
- .collect(Collectors.toSet());
+ synchronized (metadataConsumer) {
+ return ((Set<String>)
this.metadataConsumer.listTopics().keySet()).stream()
+ .map(x -> new SystemStream(systemName, x))
+ .collect(Collectors.toSet());
+ }
}
/**