[
https://issues.apache.org/jira/browse/GOBBLIN-1781?focusedWorklogId=844200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-844200
]
ASF GitHub Bot logged work on GOBBLIN-1781:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Feb/23 00:03
Start Date: 08/Feb/23 00:03
Worklog Time Spent: 10m
Work Description: AndyJiang99 commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099445812
##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class,
RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+ final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+ private TestYarnService yarnService;
+ private Config config;
+ private YarnConfiguration clusterConf = new YarnConfiguration();
+ private final EventBus eventBus = new EventBus("YarnServiceTest");
+ @Mock
+ AMRMClientAsync mockAMRMClient;
+ @Mock
+ RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+ @Mock
+ Resource mockResource;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
Review Comment:
The mock annotations doesn't seem to run these mocks, still had to reinit
these. But I got rid of the mock annotations at the top and it's good to go
##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class,
RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+ final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+ private TestYarnService yarnService;
+ private Config config;
+ private YarnConfiguration clusterConf = new YarnConfiguration();
+ private final EventBus eventBus = new EventBus("YarnServiceTest");
+ @Mock
+ AMRMClientAsync mockAMRMClient;
+ @Mock
+ RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+ @Mock
+ Resource mockResource;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+ mockRegisterApplicationMasterResponse =
Mockito.mock(RegisterApplicationMasterResponse.class);
+ mockResource = Mockito.mock(Resource.class);
+
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+
+ this.config = ConfigFactory.parseURL(url).resolve();
+
+ PowerMockito.mockStatic(AMRMClientAsync.class);
+ PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+ when(AMRMClientAsync.createAMRMClientAsync(anyInt(),
any(AMRMClientAsync.CallbackHandler.class)))
+ .thenReturn(mockAMRMClient);
+ doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+ when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(),
anyString()))
+ .thenReturn(mockRegisterApplicationMasterResponse);
+ when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+ .thenReturn(mockResource);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+
+ // Create the test yarn service, but don't start yet
+ this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+ this.clusterConf, fs, this.eventBus);
+ }
+
+ /**
+ * Testing the race condition between the yarn start up and creating yarn
container request
+ * Block on creating new yarn containers until start up of the yarn service
and purging is complete
+ */
+ @Test(groups = {"gobblin.yarn"})
+ public void testYarnStartUpFirst() throws Exception{
+ boolean canRequestNewContainers =
this.yarnService.requestTargetNumberOfContainers(new
YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+ // Not allowed to request target number of containers since yarnService
hasn't started up yet.
+ Assert.assertFalse(canRequestNewContainers);
+
+ // Start the yarn service
+ this.yarnService.startUp();
+ canRequestNewContainers =
this.yarnService.requestTargetNumberOfContainers(new
YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+ // Allowed to request target number of containers after yarnService is
started up.
+ Assert.assertTrue(canRequestNewContainers);
+ }
+
+ static class TestYarnService extends YarnService {
+ public TestYarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
+ FileSystem fs, EventBus eventBus) throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus, getMockHelixManager(config), getMockHelixAdmin());
+ }
+
+ private static HelixManager getMockHelixManager(Config config) {
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+ HelixDataAccessor helixDataAccessor =
Mockito.mock(HelixDataAccessor.class);
+ PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+ PropertyKey.Builder propertyKeyBuilder =
Mockito.mock(PropertyKey.Builder.class);
+
+
Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+
Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+
Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+
Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+
Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+
Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+
Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);
+
+ return helixManager;
+ }
+
+ private static HelixAdmin getMockHelixAdmin() {
+ HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+
Mockito.doNothing().when(helixAdmin).purgeOfflineInstances(Mockito.anyString(),
Mockito.anyLong());
+ Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(),
Mockito.anyString(), Mockito.anyBoolean());
Review Comment:
These aren't used
Issue Time Tracking
-------------------
Worklog Id: (was: 844200)
Time Spent: 50m (was: 40m)
> Helix offline instance purging is not thread safe in the yarn service
> ---------------------------------------------------------------------
>
> Key: GOBBLIN-1781
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1781
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Andy Jiang
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Helix instances are purged during startup of the yarn service. This operation
> must be done without new helix instances being added or removed (i.e. the API
> call is not thread safe).
>
> The current implementation blocks the yarn service from allocating initial
> containers while the helix instance purging is enabled, but it does not
> prevent other external services from requesting containers through its public
> methods.
> These 2 services start up concurrently, and it's possible that the
> AutoScalingYarnManager starts up before the Yarn Service is completely
> finished purging. This means leads to the AutoScalingYarnManager to
> requestContainers while the instances are still purging.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)