shanthoosh commented on a change in pull request #1484:
URL: https://github.com/apache/samza/pull/1484#discussion_r606419141
##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -274,6 +283,20 @@ void doOnProcessorChange() {
LOG.info("Generating new JobModel with processors: {}.",
currentProcessorIds);
JobModel newJobModel = generateNewJobModel(processorNodes);
+ /*
+ * Leader skips the rebalance even if there are changes in the quorum as
long as the work assignment remains the same
+ * across all the processors. The optimization is useful in the following
scenarios
+ * 1. The processor in the quorum restarts within the debounce window.
Originally, this would trigger rebalance
+ * across the processors stopping and starting their work assignment
which is detrimental to availability of
+ * the system. e.g. common scenario during rolling upgrades
+ * 2. Processors in the quorum which don't have work assignment and
their failures/restarts don't impact the
+ * quorum.
+ */
+ if (newJobModel.equals(activeJobModel)) {
Review comment:
This check would also include both the config and work-assignments for
verifying equality between two different JobModels.
Standalone allows users to launch two processors with different configs in a
quorum. Would be better to narrow down this check to work-assignment alone to
achieve the desired effect.
##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
##########
@@ -27,6 +27,7 @@
* - /
* |- groupId/
* |- JobModelGeneration/
+ * |- activeJobModelVersion (data contains the most recent active
job model version)
Review comment:
Semantically what is the difference between activeJobModelVersion &
JobModelVersion.? On what scenarios would both of them would be different?
##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -474,11 +497,55 @@ void setActiveJobModel(JobModel jobModel) {
activeJobModel = jobModel;
}
+ @VisibleForTesting
+ void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
+ debounceTimer = scheduleAfterDebounceTime;
+ }
+
@VisibleForTesting
void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade
barrierUpgradeForVersion) {
barrier = barrierUpgradeForVersion;
}
+ /**
+ * Start the processor with the last known active job model. It is safe to
start with last active job model
+ * version in all the scenarios unless the event of concurrent rebalance. We
define safe as a way to ensure that no
+ * two processors in the quorum have overlapping work assignments.
+ * In case of a concurrent rebalance there two scenarios
+ * 1. Job model version update happens before processor registration
+ * 2. Job model version update happens after processor registration
+ * ZK guarantees FIFO order for client operations, the processor is
guaranteed to see all the state up until its
+ * own registration.
+ * For scenario 1, due to above guarantee, the processor will not start with
old assignment due to mismatch in
+ * latest vs last active. (If there is no mismatch, the scenario reduces to
one of the safe scenarios)
+ *
+ * For scenario 2, it is possible for the processor to not see the writes by
the leader about job model version change
+ * but will eventually receive a notification on the job model version
change and act on it (potentially stop
+ * the work assignment if its not part of the job model).
+ *
+ * In the scenario where the processor doesn't start with last active job
model version, it will continue to follow
+ * the old protocol where leader should get notified about the processor
registration and potentially trigger
+ * rebalance and notify about changes in work assignment after consensus.
+ * TODO: SAMZA-2635: Rebalances in standalone doesn't handle DAG changes for
restarted processor
+ */
+ @VisibleForTesting
+ void startWorkWithLastActiveJobModel() {
+ LOG.info("Starting the processor with the recent active job model");
+ String lastActiveJobModelVersion = zkUtils.getLastActiveJobModelVersion();
+ String latestJobModelVersion = zkUtils.getJobModelVersion();
+
+ if (lastActiveJobModelVersion != null &&
lastActiveJobModelVersion.equals(latestJobModelVersion)) {
+ final JobModel lastActiveJobModel =
readJobModelFromMetadataStore(lastActiveJobModelVersion);
+
+ /*
+ * TODO: A temporary workaround since job model can be started only if
the stream processor is in the rebalance
Review comment:
Can you please create a follow-up ticket for this in samza and link it
here. This would ensure that this context is not lost in the comments alone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]