mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477895944



##########
File path: 
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -167,15 +167,18 @@ object JobModelManager extends Logging {
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = 
{

Review comment:
       Putting the points from our offline discussion. This is a helper method 
within the `JobModelManager` that does some massaging on the locality 
information and decorates it with its own terminology for the absence of 
locality `ANY_HOST`. The helper method could carry a perception that this is 
just getting locality information but in theory it is not. Also, this goes over 
the list of all available containers and populates `ANY_HOST` for the ones that 
don't have locality. This information is oblivious to the `LocalityManager` as 
it only treats coordinator stream as the source of truth which only knows of 
container that has locality data.

##########
File path: 
samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
##########
@@ -558,14 +548,19 @@ public Void answer(InvocationOnMock invocation) {
   public void testContainerPlacementsForJobRunningInDegradedState() throws 
Exception {
     // Set failure after retries to false to enable job running in degraded 
state
     config = new MapConfig(configVals, 
getConfigWithHostAffinityAndRetries(true, 1, false));
-    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+    state = new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false));
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);

Review comment:
       extracted the locality manager to instance variable to reuse them in the 
tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to