dnishimura commented on a change in pull request #990: SAMZA-2161: Move
ChangelogPartitionManager and CoordinatorStream ConfigReader to MetadataStore.
URL: https://github.com/apache/samza/pull/990#discussion_r274040815
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
##########
@@ -28,68 +29,83 @@
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * The Changelog manager creates the changelog stream. If a coordinator stream
manager is provided,
- * it can be used to read, write and update the changelog stream
partition-to-task mapping.
+ * Responsible for creating the changelog stream. Used for reading, writing
+ * and updating the task to changelog stream partition association in metadata
store.
*/
public class ChangelogStreamManager {
private static final Logger LOG =
LoggerFactory.getLogger(ChangelogStreamManager.class);
- // This is legacy for changelog. Need to investigate what happens if you use
a different source name
- private static final String SOURCE = "JobModelManager";
- private final CoordinatorStreamManager coordinatorStreamManager;
+ private final MetadataStore metadataStore;
+ private final CoordinatorStreamValueSerde valueSerde;
/**
- * Construct changelog manager with a bootstrapped coordinator stream.
+ * Builds the ChangelogStreamManager based upon the provided {@link
MetadataStore} that is instantiated.
+ * Setting up a metadata store instance is expensive which requires opening
multiple connections
+ * and reading tons of information. Fully instantiated metadata store is
taken as a constructor argument
+ * to reuse it across different utility classes. Uses the {@link
CoordinatorStreamValueSerde} to serialize
+ * messages before reading/writing into metadata store.
*
- * @param coordinatorStreamManager Coordinator stream manager.
+ * @param metadataStore an instance of {@link MetadataStore} to read/write
the container locality.
*/
- public ChangelogStreamManager(CoordinatorStreamManager
coordinatorStreamManager) {
- this.coordinatorStreamManager = coordinatorStreamManager;
+ public ChangelogStreamManager(MetadataStore metadataStore) {
+ this.metadataStore = metadataStore;
+ this.valueSerde = new
CoordinatorStreamValueSerde(SetChangelogMapping.TYPE);
}
/**
- * Read the taskName to partition mapping that is being maintained by this
ChangelogManager
+ * Reads the taskName to changelog partition assignments from the {@link
MetadataStore}.
+ *
* @return TaskName to change LOG partition mapping, or an empty map if
there were no messages.
*/
public Map<TaskName, Integer> readPartitionMapping() {
LOG.debug("Reading changelog partition information");
- final HashMap<TaskName, Integer> changelogMapping = new HashMap<>();
- for (CoordinatorStreamMessage coordinatorStreamMessage :
coordinatorStreamManager.getBootstrappedStream(SetChangelogMapping.TYPE)) {
- SetChangelogMapping changelogMapEntry = new
SetChangelogMapping(coordinatorStreamMessage);
- changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()),
changelogMapEntry.getPartition());
- LOG.debug("TaskName: {} is mapped to {}",
changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
- }
+ final Map<TaskName, Integer> changelogMapping = new HashMap<>();
+ metadataStore.all().forEach((taskName, partitionIdAsBytes) -> {
+ String partitionId = valueSerde.fromBytes(partitionIdAsBytes);
+ LOG.debug("TaskName: {} is mapped to {}", taskName, partitionId);
+ if (partitionId != null) {
Review comment:
Probably safer to replace the null check with
StringUtils.isNotBlank(partitionId)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services