lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538903941
##########
File path:
samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
##########
@@ -192,4 +196,38 @@ public void testToArgs() {
assertEquals(expected.size(), actual.size());
assertTrue(actual.containsAll(expected));
}
+
+ @Test
+ public void testGenerateAndUpdateJobCoordinatorMetadata() {
+ Config jobConfig = new MapConfig(configMap);
+
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+ ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+
spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+ JobCoordinatorMetadata previousMetadata =
mock(JobCoordinatorMetadata.class);
+ JobCoordinatorMetadata newMetadata = mock(JobCoordinatorMetadata.class);
+ JobCoordinatorMetadataManager jobCoordinatorMetadataManager =
mock(JobCoordinatorMetadataManager.class);
+ JobModel mockJobModel = mock(JobModel.class);
+
+
when(jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
+ when(jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(any(),
any())).thenReturn(newMetadata);
+ when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(false);
+
when(clusterBasedJobCoordinator.createJobCoordinatorMetadataManager()).thenReturn(jobCoordinatorMetadataManager);
+
+ /*
+ * Verify if there are no changes to metadata, the metadata changed flag
remains false and no interactions
+ * with job coordinator metadata manager
+ */
+
clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+ assertFalse("JC metadata should remain unchanged",
clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
+ verify(jobCoordinatorMetadataManager,
times(0)).writeJobCoordinatorMetadata(any());
+
+ /*
+ * Verify if there are changes to metadata, we persist the new metadata &
update the metadata changed flag
+ */
+ when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(true);
+
clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+ assertTrue("JC metadata should be true",
clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
Review comment:
nit: add changed "JC metadata changed should be true"
##########
File path:
samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
##########
@@ -272,10 +273,60 @@ public void run() {
// Verify only 1 was requested with allocator
assertEquals(1, allocator.requestedContainers);
+ assertTrue("Ensure no processors were forcefully restarted",
callback.resourceStatuses.isEmpty());
cpm.stop();
}
+ @Test
+ public void testOnInitToForceRestartAMHighAvailability() throws Exception {
+ Map<String, String> configMap = new HashMap<>(configVals);
+ configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");
+ Config conf = new MapConfig(configMap);
+
+ SamzaApplicationState state = new
SamzaApplicationState(getJobModelManager(2));
+ state.runningProcessors.put("0", new SamzaResource(1, 1024, "host", "0"));
+
+ MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
+ ClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
+ ContainerManager containerManager =
+ buildContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
+
+ ContainerProcessManager cpm =
+ buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.empty(), true);
+
+ MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
+ clusterResourceManager,
+ conf,
+ state,
+ containerManager);
+
+ getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
+ CountDownLatch latch = new CountDownLatch(1);
+ getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
+ public void run() {
+ isRunning = true;
+ latch.countDown();
+ }
+ });
+
+ cpm.start();
Review comment:
should we also check if the ClusterResourceManager.stopStreamProcessor
was invoked and with the correct SamzaResource?
##########
File path:
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource,
CommandBuilder builder
public void stopStreamProcessor(SamzaResource resource) {
synchronized (lock) {
Container container = allocatedResources.get(resource);
+ /*
+ * 1. Stop the container through NMClient if the container was
instantiated as part of NMClient lifecycle.
+ * 2. Stop the container through AMClient by release the assigned
container if the container was from the previous
+ * attempt and managed by the AM due to AM-HA
+ * 3. Ignore the request if the container associated with the resource
isn't present in the bookeeping.
+ */
if (container != null) {
log.info("Stopping Container ID: {} on host: {}",
resource.getContainerId(), resource.getHost());
this.nmClientAsync.stopContainerAsync(container.getId(),
container.getNodeId());
+ } else {
+ YarnContainer yarnContainer =
state.runningProcessors.get(resource.getContainerId());
+ if (yarnContainer != null) {
+ log.info("Stopping container from previous attempt with Container
ID: {} on host: {}",
+ resource.getContainerId(), resource.getHost());
+ amClient.releaseAssignedContainer(yarnContainer.id());
Review comment:
after amClient.releaseAssignedContainer(yarnContainer.id()); is done,
what do we get the call back as?
because we should use that call back to restart the container right (as
earlier we allocated resources only for processors not runnning in prev
attempt).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]