rmatharu commented on a change in pull request #987: SAMZA-2158: Remove the 
redunant coordinator stream reads in the ApplicationMaster startup sequence.
URL: https://github.com/apache/samza/pull/987#discussion_r273175100
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
 ##########
 @@ -46,38 +49,42 @@
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.util.CoordinatorStreamUtil;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An implementation of the {@link MetadataStore} interface where the metadata 
of the Samza job is stored in coordinator stream.
+ * An implementation of the {@link MetadataStore} interface where the metadata 
of the samza job is stored in coordinator stream.
  *
  * This class is thread safe.
+ *
+ * It is recommended to use {@link NamespaceAwareCoordinatorStreamStore}. This 
will enable the single CoordinatorStreamStore connection
+ * to be shared by the multiple {@link NamespaceAwareCoordinatorStreamStore} 
instances.
  */
 public class CoordinatorStreamStore implements MetadataStore {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorStreamStore.class);
   private static final String SOURCE = "SamzaContainer";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   private final Config config;
   private final SystemStream coordinatorSystemStream;
   private final SystemStreamPartition coordinatorSystemStreamPartition;
   private final SystemProducer systemProducer;
   private final SystemConsumer systemConsumer;
   private final SystemAdmin systemAdmin;
-  private final String type;
-  private final CoordinatorStreamKeySerde keySerde;
 
-  private final Map<String, byte[]> bootstrappedMessages = new HashMap<>();
+  // Namespaced key to the message byte array.
+  private final Map<String, byte[]> bootstrappedMessages = new 
ConcurrentHashMap<>();
 
 Review comment:
   Maybe just call this "messagesRead", and rename 
"bootstrapMessagesFromStream()" to "readMessagesFromStream()" ?
   And the comment can be updated to "map storing messages read, indexed by the 
namespace-key"
   
   Bootstrapping doesn't mean much here, all this is doing is reading messages 
from stream
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to