This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ccae8f9  SAMZA-2152: Bugfix: Making KafkaSytemAdmin's metadataConsumer 
accesses thread-safe, enabling StreamRegexMonitors only when required
ccae8f9 is described below

commit ccae8f992007cec2f960bd1608b0c61539ec9a42
Author: Ray Matharu <[email protected]>
AuthorDate: Wed Apr 3 11:56:24 2019 -0700

    SAMZA-2152: Bugfix: Making KafkaSytemAdmin's metadataConsumer accesses 
thread-safe, enabling StreamRegexMonitors only when required
    
    StreamPartitionCountMOnitor and StreamRegexMonitor use the 
KafkaSystemAdmin, which has a kafka-consumer.
    Kafka consumer is not thread-safe, therefore explicit synchronization is 
required for metadata accesses.
    
    StreamRegexMonitor is not to be enabled when there is no regex input.
    
    Author: Ray Matharu <[email protected]>
    
    Reviewers: Prateek Maheshwari <[email protected]>
    
    Closes #981 from rmatharu/bugfix-regexmonitor
---
 .../clustermanager/ClusterBasedJobCoordinator.java | 16 ++++--
 .../samza/system/kafka/KafkaSystemAdmin.java       | 64 +++++++++++++---------
 2 files changed, 50 insertions(+), 30 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0eddbf2..490db24 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -340,7 +340,7 @@ public class ClusterBasedJobCoordinator {
 
     // if no rewriter is defined, there is nothing to monitor
     if (!rewritersList.isDefined()) {
-      log.warn("No config rewriters are defined. No StreamRegexMonitor 
created.");
+      log.info("No config rewriters are defined. No StreamRegexMonitor 
created.");
       return Optional.empty();
     }
 
@@ -349,14 +349,22 @@ public class ClusterBasedJobCoordinator {
         JavaConverters.mapAsJavaMapConverter(new 
JobConfig(config).getMonitorRegexPatternMap(rewritersList.get()))
             .asJava();
 
+    // if there are no regexes to monitor
+    if (inputRegexesToMonitor.isEmpty()) {
+      log.info("No input regexes are defined. No StreamRegexMonitor created.");
+      return Optional.empty();
+    }
+
     return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor, 
inputRegexesToMonitor, streamMetadata, metrics,
         new JobConfig(config).getMonitorRegexFrequency(), new 
StreamRegexMonitor.Callback() {
           @Override
           public void onInputStreamsChanged(Set<SystemStream> initialInputSet, 
Set<SystemStream> newInputStreams,
               Map<String, Pattern> regexesMonitored) {
-            log.error("New input system-streams discovered. Failing the job. 
New input streams: {}", newInputStreams,
-                " Existing input streams:", inputStreamsToMonitor);
-            state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+            if (hasDurableStores) {
+              log.error("New input system-streams discovered. Failing the job. 
New input streams: {}", newInputStreams,
+                  " Existing input streams:", inputStreamsToMonitor);
+              state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+            }
             coordinatorException = new InputStreamsDiscoveredException("New 
input streams added: " + newInputStreams);
           }
         }));
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index a208835..a82a6b9 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -90,7 +90,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
   public static volatile boolean deleteMessageCalled = false;
 
   protected final String systemName;
-  protected final Consumer metadataConsumer;
+  private final Consumer metadataConsumer; // Need to synchronize all 
accesses, since KafkaConsumer is not thread-safe
   protected final Config config;
 
   // Custom properties to create a new coordinator stream.
@@ -231,11 +231,13 @@ public class KafkaSystemAdmin implements SystemAdmin {
             streamNames.forEach(streamName -> {
               Map<Partition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new 
HashMap<>();
 
-              List<PartitionInfo> partitionInfos = 
metadataConsumer.partitionsFor(streamName);
+              List<PartitionInfo> partitionInfos;
+              synchronized (metadataConsumer) {
+                partitionInfos = metadataConsumer.partitionsFor(streamName);
+              }
               LOG.debug("Stream {} has partitions {}", streamName, 
partitionInfos);
-
-              partitionInfos.forEach(partitionInfo -> 
partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
-
+              partitionInfos.forEach(
+                  partitionInfo -> partitionMetadata.put(new 
Partition(partitionInfo.partition()), dummySspm));
               allMetadata.put(streamName, new SystemStreamMetadata(streamName, 
partitionMetadata));
             });
 
@@ -381,11 +383,15 @@ public class KafkaSystemAdmin implements SystemAdmin {
     Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
     Map<SystemStreamPartition, String> newestOffsets = new HashMap<>();
     Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>();
-
-    Map<TopicPartition, Long> oldestOffsetsWithLong = 
metadataConsumer.beginningOffsets(topicPartitions);
-    LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
-    Map<TopicPartition, Long> upcomingOffsetsWithLong = 
metadataConsumer.endOffsets(topicPartitions);
-    LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
+    Map<TopicPartition, Long> oldestOffsetsWithLong;
+    Map<TopicPartition, Long> upcomingOffsetsWithLong;
+
+    synchronized (metadataConsumer) {
+      oldestOffsetsWithLong = 
metadataConsumer.beginningOffsets(topicPartitions);
+      LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
+      upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
+      LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
+    }
 
     oldestOffsetsWithLong.forEach((topicPartition, offset) -> 
oldestOffsets.put(toSystemStreamPartition(topicPartition), 
String.valueOf(offset)));
 
@@ -424,21 +430,23 @@ public class KafkaSystemAdmin implements SystemAdmin {
     LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", 
topics, systemName);
 
     topics.forEach(topic -> {
-      List<PartitionInfo> partitionInfos = 
metadataConsumer.partitionsFor(topic);
+      synchronized (metadataConsumer) {
+        List<PartitionInfo> partitionInfos = 
metadataConsumer.partitionsFor(topic);
 
-      if (partitionInfos == null) {
-        String msg = String.format("Partition info not(yet?) available for 
system %s topic %s", systemName, topic);
-        throw new SamzaException(msg);
-      }
+        if (partitionInfos == null) {
+          String msg = String.format("Partition info not(yet?) available for 
system %s topic %s", systemName, topic);
+          throw new SamzaException(msg);
+        }
 
-      List<TopicPartition> topicPartitions = partitionInfos.stream()
-          .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()))
-          .collect(Collectors.toList());
+        List<TopicPartition> topicPartitions = partitionInfos.stream()
+            .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()))
+            .collect(Collectors.toList());
 
-      OffsetsMaps offsetsForTopic = 
fetchTopicPartitionsMetadata(topicPartitions);
-      allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
-      allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
-      allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
+        OffsetsMaps offsetsForTopic = 
fetchTopicPartitionsMetadata(topicPartitions);
+        allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
+        allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
+        allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
+      }
     });
 
     return assembleMetadata(allOldestOffsets, allNewestOffsets, 
allUpcomingOffsets);
@@ -572,7 +580,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
     Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>();
     List<PartitionInfo> partitionInfoList;
     for (String topic : topics) {
-      partitionInfoList = metadataConsumer.partitionsFor(topic);
+      synchronized (metadataConsumer) {
+        partitionInfoList = metadataConsumer.partitionsFor(topic);
+      }
       streamToPartitionsInfo.put(topic, partitionInfoList);
     }
 
@@ -629,9 +639,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
 
   @Override
   public Set<SystemStream> getAllSystemStreams() {
-    return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
-        .map(x -> new SystemStream(systemName, x))
-        .collect(Collectors.toSet());
+    synchronized (metadataConsumer) {
+      return ((Set<String>) 
this.metadataConsumer.listTopics().keySet()).stream()
+          .map(x -> new SystemStream(systemName, x))
+          .collect(Collectors.toSet());
+    }
   }
 
   /**

Reply via email to