cameronlee314 commented on a change in pull request #1027: SAMZA-2046: 
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r283543587
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##########
 @@ -173,71 +226,151 @@ public void deleteStartpoint(SystemStreamPartition ssp, 
TaskName taskName) {
     Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
-    metadataStore.delete(toStoreKey(ssp, taskName));
+    readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
   }
 
   /**
-   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this 
method re-maps the Startpoints from
-   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all 
tasks provided by the {@link JobModel}
+   * The Startpoints that are written to with {@link 
#writeStartpoint(SystemStreamPartition, Startpoint)} and with
+   * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are 
moved from a "read-write" namespace
+   * to a "fan out" namespace.
    * This method is not atomic or thread-safe. The intent is for the Samza 
Processor's coordinator to use this
    * method to assign the Startpoints to the appropriate tasks.
-   * @param jobModel The {@link JobModel} is used to determine which {@link 
TaskName} each {@link SystemStreamPartition} maps to.
-   * @return The list of {@link SystemStreamPartition}s that were fanned out 
to SystemStreamPartition+TaskName.
+   * @param taskToSSPs Determines which {@link TaskName} each {@link 
SystemStreamPartition} maps to.
+   * @return The set of active {@link TaskName}s that were fanned out to.
    */
-  public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel 
jobModel) {
+  public Set<TaskName> fanOut(Map<TaskName, Set<SystemStreamPartition>> 
taskToSSPs) {
     Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
-    Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
-
-    HashSet<SystemStreamPartition> sspsToDelete = new HashSet<>();
-
-    // Inspect the job model for TaskName-to-SSPs mapping and re-map 
startpoints from SSP-only keys to SSP+TaskName keys.
-    for (ContainerModel containerModel: jobModel.getContainers().values()) {
-      for (TaskModel taskModel : containerModel.getTasks().values()) {
-        TaskName taskName = taskModel.getTaskName();
-        for (SystemStreamPartition ssp : 
taskModel.getSystemStreamPartitions()) {
-          Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
-          if (startpoint == null) {
-            LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName);
-            continue;
-          }
-
-          LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined 
by the job model.", ssp);
-          Startpoint startpointForTask = readStartpoint(ssp, taskName);
-          if (startpointForTask == null || 
startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) {
-            writeStartpoint(ssp, taskName, startpoint);
-            sspsToDelete.add(ssp); // Mark for deletion
-            LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, 
taskName);
-          } else {
-            LOG.info("Startpoint for SSP: {} and task: {} already exists and 
will not be overwritten.", ssp, taskName);
-          }
+    Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs 
cannot be null or empty");
+
+    // construct fan out with the existing readWriteStore entries and mark the 
entries for deletion after fan out
+    Instant now = Instant.now();
+    HashMultimap<SystemStreamPartition, TaskName> deleteKeys = 
HashMultimap.create();
+    HashMap<TaskName, StartpointFanOut> fanOuts = new HashMap<>();
+    for (TaskName taskName : taskToSSPs.keySet()) {
+      Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName);
+      if (CollectionUtils.isEmpty(ssps)) {
+        LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
+        continue;
+      }
+      for (SystemStreamPartition ssp : ssps) {
+        Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
+        if (startpoint != null) {
+          deleteKeys.put(ssp, null);
+        }
+
+        Startpoint startpointForTask = readStartpoint(ssp, taskName); // Read 
SSP+taskName key
+        if (startpointForTask != null) {
+          deleteKeys.put(ssp, taskName);
+        }
+
+        Startpoint startpointWithPrecedence = 
resolveStartpointPrecendence(startpoint, startpointForTask);
+        if (startpointWithPrecedence == null) {
+          continue;
+        }
+
+        fanOuts.putIfAbsent(taskName, new StartpointFanOut(now, new 
HashMap<>()));
+        fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence);
+      }
+    }
+
+    if (fanOuts.isEmpty()) {
+      LOG.debug("No fan outs created.");
+      return ImmutableSet.of();
+    }
+
+    LOG.info("Fanning out to {} tasks", fanOuts.size());
+
+    StartpointFanOut.StartpointFanOutSerde startpointFanOutSerde = new 
StartpointFanOut.StartpointFanOutSerde();
+    for (TaskName taskName : fanOuts.keySet()) {
+      String fanOutKey = toFanOutStoreKey(taskName);
+      StartpointFanOut newFanOut = fanOuts.get(taskName);
+      byte[] fanOutFromStoreInBytes = fanOutStore.get(fanOutKey);
+
+      if (ArrayUtils.isNotEmpty(fanOutFromStoreInBytes)) {
+        // Merge new fan out with existing fan out. This is not a typical 
scenario but it can happen when the job
 
 Review comment:
   Is there any concern here regarding non-atomic merge? Is there a case where 
the job coordinator could be merging a startpoint at the same time as a 
processing container deleting a startpoint?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to