dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284074141
##########
File path:
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
##########
@@ -173,71 +205,148 @@ public void deleteStartpoint(SystemStreamPartition ssp,
TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
- metadataStore.delete(toStoreKey(ssp, taskName));
+ readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
}
/**
- * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this
method re-maps the Startpoints from
- * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all
tasks provided by the {@link JobModel}
+ * The Startpoints that are written to with {@link
#writeStartpoint(SystemStreamPartition, Startpoint)} and with
+ * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are
moved from a "read-write" namespace
+ * to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza
Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
- * @param jobModel The {@link JobModel} is used to determine which {@link
TaskName} each {@link SystemStreamPartition} maps to.
- * @return The list of {@link SystemStreamPartition}s that were fanned out
to SystemStreamPartition+TaskName.
+ * @param taskToSSPs Determines which {@link TaskName} each {@link
SystemStreamPartition} maps to.
+ * @return The set of active {@link TaskName}s that were fanned out to.
*/
- public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel
jobModel) {
+ public Set<TaskName> fanOut(Map<TaskName, Set<SystemStreamPartition>>
taskToSSPs) throws IOException {
Review comment:
Doesn't hurt to. I'll add it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services