shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284853798
##########
File path:
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
##########
@@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp,
TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
- metadataStore.delete(toStoreKey(ssp, taskName));
+ readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
}
/**
- * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this
method re-maps the Startpoints from
- * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all
tasks provided by the {@link JobModel}
+ * The Startpoints that are written to with {@link
#writeStartpoint(SystemStreamPartition, Startpoint)} and with
+ * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are
moved from a "read-write" namespace
+ * to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza
Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
- * @param jobModel The {@link JobModel} is used to determine which {@link
TaskName} each {@link SystemStreamPartition} maps to.
- * @return The list of {@link SystemStreamPartition}s that were fanned out
to SystemStreamPartition+TaskName.
+ * @param taskToSSPs Determines which {@link TaskName} each {@link
SystemStreamPartition} maps to.
+ * @return The set of active {@link TaskName}s that were fanned out to.
*/
- public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel
jobModel) {
+ public Map<TaskName, Map<SystemStreamPartition, Startpoint>>
fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException
{
Preconditions.checkState(!stopped, "Underlying metadata store not
available");
- Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
-
- HashSet<SystemStreamPartition> sspsToDelete = new HashSet<>();
-
- // Inspect the job model for TaskName-to-SSPs mapping and re-map
startpoints from SSP-only keys to SSP+TaskName keys.
- for (ContainerModel containerModel: jobModel.getContainers().values()) {
- for (TaskModel taskModel : containerModel.getTasks().values()) {
- TaskName taskName = taskModel.getTaskName();
- for (SystemStreamPartition ssp :
taskModel.getSystemStreamPartitions()) {
- Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
- if (startpoint == null) {
- LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName);
- continue;
- }
-
- LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined
by the job model.", ssp);
- Startpoint startpointForTask = readStartpoint(ssp, taskName);
- if (startpointForTask == null ||
startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) {
- writeStartpoint(ssp, taskName, startpoint);
- sspsToDelete.add(ssp); // Mark for deletion
- LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp,
taskName);
- } else {
- LOG.info("Startpoint for SSP: {} and task: {} already exists and
will not be overwritten.", ssp, taskName);
- }
+ Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs
cannot be null or empty");
+ // construct fan out with the existing readWriteStore entries and mark the
entries for deletion after fan out
+ Instant now = Instant.now();
+ HashMultimap<SystemStreamPartition, TaskName> deleteKeys =
HashMultimap.create();
+ HashMap<TaskName, StartpointFanOutPerTask> fanOuts = new HashMap<>();
+ for (TaskName taskName : taskToSSPs.keySet()) {
+ Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName);
+ if (CollectionUtils.isEmpty(ssps)) {
+ LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
+ continue;
+ }
+ for (SystemStreamPartition ssp : ssps) {
+ Optional<Startpoint> startpoint = readStartpoint(ssp); // Read
SSP-only key
+ startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
+
+ Optional<Startpoint> startpointForTask = readStartpoint(ssp,
taskName); // Read SSP+taskName key
+ startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
+
+ Optional<Startpoint> startpointWithPrecedence =
resolveStartpointPrecendence(startpoint, startpointForTask);
+ if (!startpointWithPrecedence.isPresent()) {
+ continue;
}
+
+ fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now));
+ fanOuts.get(taskName).getFanOuts().put(ssp,
startpointWithPrecedence.get());
}
}
- // Delete SSP-only keys
- sspsToDelete.forEach(ssp -> {
- deleteStartpoint(ssp);
- LOG.info("All Startpoints for SSP: {} have been grouped to the
appropriate tasks and the SSP was deleted.");
- });
+ if (fanOuts.isEmpty()) {
+ LOG.debug("No fan outs created.");
+ return ImmutableMap.of();
+ }
+
+ LOG.info("Fanning out to {} tasks", fanOuts.size());
- return ImmutableSet.copyOf(sspsToDelete);
+ // Fan out to store
+ for (TaskName taskName : fanOuts.keySet()) {
+ String fanOutKey = toFanOutStoreKey(taskName);
+ StartpointFanOutPerTask newFanOut = fanOuts.get(taskName);
+ fanOutStore.put(fanOutKey, objectMapper.writeValueAsBytes(newFanOut));
Review comment:
Consider this scenario.
1. Let's say a samza job consumes from a single stream that has 10000
partitions. This would make the `JobModel` of the job to have 10000 tasks .
2. Let's say user wants to consume all the `SystemStreamPartition's` from a
specific timestamp after he restarts that particular samza job.
3. User publishes a startpoint with timestamp set at the stream level.
4. After reading the published `StartpointTimestamp` from `Namespace-1` of
`MetadataStore`, we would be publishing around 10000 individual sequential
writes to the `Namespace-2` to `MetadataStore`. This will entail 10000 RTT's to
the storage layer. Rather than making one batch write, we're making N+1
writes(where N is the number of topic partitions consumed by the samza job).
5. Since this fan-out of `SystemStreamPartition's` will happen in the samza
`ApplicationMaster` startup sequence, this will incur significant delay in the
allocation of containers by `ApplicationMaster` and increase the messages
behind high-watermark. Higher the number of topic partitions consumed by the
samza job, more the processing delay.
The existing metadata-store API doesn't support batch puts and hence you had
to opt for individual fanned-out `SystemStreamPartition` writes.
What do you think? Do you agree that this will cause performance issues?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services