shanthoosh commented on a change in pull request #990: SAMZA-2161: Move 
ChangelogPartitionManager and CoordinatorStream ConfigReader to MetadataStore.
URL: https://github.com/apache/samza/pull/990#discussion_r274058884
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
 ##########
 @@ -28,68 +29,83 @@
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.SystemConfig;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.StreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * The Changelog manager creates the changelog stream. If a coordinator stream 
manager is provided,
- * it can be used to read, write and update the changelog stream 
partition-to-task mapping.
+ * Responsible for creating the changelog stream. Used for reading, writing
+ * and updating the task to changelog stream partition association in metadata 
store.
  */
 public class ChangelogStreamManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogStreamManager.class);
-  // This is legacy for changelog. Need to investigate what happens if you use 
a different source name
-  private static final String SOURCE = "JobModelManager";
 
-  private final CoordinatorStreamManager coordinatorStreamManager;
+  private final MetadataStore metadataStore;
+  private final CoordinatorStreamValueSerde valueSerde;
 
   /**
-   * Construct changelog manager with a bootstrapped coordinator stream.
+   * Builds the ChangelogStreamManager based upon the provided {@link 
MetadataStore} that is instantiated.
+   * Setting up a metadata store instance is expensive which requires opening 
multiple connections
+   * and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   * to reuse it across different utility classes. Uses the {@link 
CoordinatorStreamValueSerde} to serialize
+   * messages before reading/writing into metadata store.
    *
-   * @param coordinatorStreamManager Coordinator stream manager.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write 
the container locality.
    */
-  public ChangelogStreamManager(CoordinatorStreamManager 
coordinatorStreamManager) {
-    this.coordinatorStreamManager = coordinatorStreamManager;
+  public ChangelogStreamManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetChangelogMapping.TYPE);
   }
 
   /**
-   * Read the taskName to partition mapping that is being maintained by this 
ChangelogManager
+   * Reads the taskName to changelog partition assignments from the {@link 
MetadataStore}.
+   *
    * @return TaskName to change LOG partition mapping, or an empty map if 
there were no messages.
    */
   public Map<TaskName, Integer> readPartitionMapping() {
     LOG.debug("Reading changelog partition information");
-    final HashMap<TaskName, Integer> changelogMapping = new HashMap<>();
-    for (CoordinatorStreamMessage coordinatorStreamMessage : 
coordinatorStreamManager.getBootstrappedStream(SetChangelogMapping.TYPE)) {
-      SetChangelogMapping changelogMapEntry = new 
SetChangelogMapping(coordinatorStreamMessage);
-      changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), 
changelogMapEntry.getPartition());
-      LOG.debug("TaskName: {} is mapped to {}", 
changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
-    }
+    final Map<TaskName, Integer> changelogMapping = new HashMap<>();
+    metadataStore.all().forEach((taskName, partitionIdAsBytes) -> {
+        String partitionId = valueSerde.fromBytes(partitionIdAsBytes);
+        LOG.debug("TaskName: {} is mapped to {}", taskName, partitionId);
+        if (partitionId != null) {
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to