[ 
https://issues.apache.org/jira/browse/GOBBLIN-1781?focusedWorklogId=844200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-844200
 ]

ASF GitHub Bot logged work on GOBBLIN-1781:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Feb/23 00:03
            Start Date: 08/Feb/23 00:03
    Worklog Time Spent: 10m 
      Work Description: AndyJiang99 commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099445812


##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, 
RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);

Review Comment:
   The mock annotations doesn't seem to run these mocks, still had to reinit 
these. But I got rid of the mock annotations at the top and it's good to go



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, 
RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = 
Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), 
any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), 
anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, fs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn 
container request
+   * Block on creating new yarn containers until start up of the yarn service 
and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    boolean canRequestNewContainers = 
this.yarnService.requestTargetNumberOfContainers(new 
YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Not allowed to request target number of containers since yarnService 
hasn't started up yet.
+    Assert.assertFalse(canRequestNewContainers);
+
+    // Start the yarn service
+    this.yarnService.startUp();
+    canRequestNewContainers = 
this.yarnService.requestTargetNumberOfContainers(new 
YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Allowed to request target number of containers after yarnService is 
started up.
+    Assert.assertTrue(canRequestNewContainers);
+  }
+
+  static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus, getMockHelixManager(config), getMockHelixAdmin());
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      HelixDataAccessor helixDataAccessor = 
Mockito.mock(HelixDataAccessor.class);
+      PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+      PropertyKey.Builder propertyKeyBuilder = 
Mockito.mock(PropertyKey.Builder.class);
+
+      
Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+      
Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+      
Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      
Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+      
Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+      
Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+      
Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);
+
+      return helixManager;
+    }
+
+    private static HelixAdmin getMockHelixAdmin() {
+      HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+      
Mockito.doNothing().when(helixAdmin).purgeOfflineInstances(Mockito.anyString(), 
Mockito.anyLong());
+      Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyBoolean());

Review Comment:
   These aren't used





Issue Time Tracking
-------------------

    Worklog Id:     (was: 844200)
    Time Spent: 50m  (was: 40m)

> Helix offline instance purging is not thread safe in the yarn service
> ---------------------------------------------------------------------
>
>                 Key: GOBBLIN-1781
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1781
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Andy Jiang
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Helix instances are purged during startup of the yarn service. This operation 
> must be done without new helix instances being added or removed (i.e. the API 
> call is not thread safe).
>  
> The current implementation blocks the yarn service from allocating initial 
> containers while the helix instance purging is enabled, but it does not 
> prevent other external services from requesting containers through its public 
> methods.
> These 2 services start up concurrently, and it's possible that the 
> AutoScalingYarnManager starts up before the Yarn Service is completely 
> finished purging. This means leads to the AutoScalingYarnManager to 
> requestContainers while the instances are still purging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to