This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d703b8f [GOBBLIN-1073] Add proxy user and requester quota to GaaS
d703b8f is described below
commit d703b8f5201dd343ea543d52af63eb68e38121f1
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Mar 12 21:26:12 2020 -0700
[GOBBLIN-1073] Add proxy user and requester quota to GaaS
Closes #2913 from jack-moseley/gaas-quota
---
.../service/modules/orchestration/DagManager.java | 112 ++++++++++++++++++++-
.../modules/orchestration/DagManagerUtils.java | 14 +++
.../modules/orchestration/DagManagerTest.java | 2 +-
3 files changed, 123 insertions(+), 5 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index fbee417..0187c8a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -123,6 +124,11 @@ public class DagManager extends AbstractIdleService {
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY =
JOB_STATUS_RETRIEVER_KEY + ".class";
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS =
FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX +
"dagStateStoreClass";
+ private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX +
"defaultJobQuota";
+ private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX +
"perUserQuota";
+
+ private static final String QUOTA_SEPERATOR = ":";
/**
* Action to be performed on a {@link Dag}, in case of a job failure.
Currently, we allow 2 modes:
@@ -165,6 +171,8 @@ public class DagManager extends AbstractIdleService {
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
+ private final int defaultQuota;
+ private final Map<String, Integer> perUserQuota;
private volatile boolean isActive = false;
@@ -183,6 +191,12 @@ public class DagManager extends AbstractIdleService {
this.eventSubmitter = Optional.absent();
}
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
+ this.perUserQuota = new HashMap<>();
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
+ this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+
try {
this.jobStatusRetriever = createJobStatusRetriever(config);
} catch (ReflectiveOperationException e) {
@@ -328,7 +342,7 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore,
- queue[i], cancelQueue[i], instrumentationEnabled);
+ queue[i], cancelQueue[i], instrumentationEnabled, defaultQuota,
perUserQuota);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
@@ -362,6 +376,8 @@ public class DagManager extends AbstractIdleService {
*/
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
+ private static final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
+ private static final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
@@ -371,6 +387,8 @@ public class DagManager extends AbstractIdleService {
private final MetricContext metricContext;
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
+ private final int defaultQuota;
+ private final Map<String, Integer> perUserQuota;
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
@@ -381,11 +399,14 @@ public class DagManager extends AbstractIdleService {
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore,
- BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, boolean instrumentationEnabled) {
+ BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, boolean instrumentationEnabled,
+ int defaultQuota, Map<String, Integer> perUserQuota) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.queue = queue;
this.cancelQueue = cancelQueue;
+ this.defaultQuota = defaultQuota;
+ this.perUserQuota = perUserQuota;
if (instrumentationEnabled) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
@@ -706,11 +727,12 @@ public class DagManager extends AbstractIdleService {
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
- String specExecutorUri =
dagNode.getValue().getSpecExecutor().getUri().toString();
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
// Run this spec on selected executor
SpecProducer producer = null;
try {
+ checkQuota(dagNode);
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) :
null;
@@ -747,6 +769,59 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws
IOException {
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ boolean proxyUserCheck = true;
+ if (proxyUser != null) {
+ proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount,
proxyUser, dagNode);
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+ String requesterMessage = null;
+ if (serializedRequesters != null) {
+ for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
+ requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount,
requester.getName(), dagNode);
+ if (!requesterCheck && requesterMessage == null) {
+ requesterMessage = "Quota exceeded for requester " +
requester.getName() + " on executor " + specExecutorUri + ": quota="
+ + getQuotaForUser(requester.getName()) + ", runningJobs=" +
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode));
+ }
+ }
+ }
+
+ // Throw errors for reach quota at the end to avoid inconsistent job
counts
+ if (!proxyUserCheck) {
+ throw new IOException("Quota exceeded for proxy user " + proxyUser + "
on executor " + specExecutorUri +
+ ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" +
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
+ }
+
+ if (!requesterCheck) {
+ throw new IOException(requesterMessage);
+ }
+ }
+
+ /**
+ * Increment quota by one for the given map and key.
+ * @return true if quota is not reached for this user or user is
whitelisted, false otherwise.
+ */
+ private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap,
String user, DagNode<JobExecutionPlan> dagNode) {
+ String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+ int jobCount = quotaMap.getOrDefault(key, 0);
+
+ // Only increment job count for first attempt, since job is considered
running between retries
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+ jobCount++;
+ quotaMap.put(key, jobCount);
+ }
+
+ return jobCount <= getQuotaForUser(user);
+ }
+
+ private int getQuotaForUser(String user) {
+ return perUserQuota.getOrDefault(user, defaultQuota);
+ }
+
/**
* Method that defines the actions to be performed when a job finishes
either successfully or with failure.
* This method updates the state of the dag and performs clean up actions
as necessary.
@@ -762,6 +837,8 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId,
jobStatus.name());
+ releaseQuota(dagNode);
+
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).dec();
getRunningJobsCounterForUser(dagNode).forEach(counter ->
counter.dec());
@@ -785,6 +862,33 @@ public class DagManager extends AbstractIdleService {
}
}
+ /**
+ * Decrement the quota by one for the proxy user and requesters
corresponding to the provided {@link DagNode}.
+ */
+ private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
+ if (proxyUserToJobCount.containsKey(proxyUserKey) &&
proxyUserToJobCount.get(proxyUserKey) > 0) {
+ proxyUserToJobCount.put(proxyUserKey,
proxyUserToJobCount.get(proxyUserKey) - 1);
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ if (serializedRequesters != null) {
+ try {
+ for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
+ String requesterKey =
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
+ if (requesterToJobCount.containsKey(requesterKey) &&
requesterToJobCount.get(requesterKey) > 0) {
+ requesterToJobCount.put(requesterKey,
requesterToJobCount.get(requesterKey) - 1);
+ }
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " +
serializedRequesters, e);
+ }
+ }
+ }
+
private void deleteJobState(String dagId, DagNode<JobExecutionPlan>
dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
@@ -828,7 +932,7 @@ public class DagManager extends AbstractIdleService {
}
try {
- String serializedRequesters = ConfigUtils.getString(configs,
RequesterService.REQUESTER_LIST, null);
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
if (StringUtils.isNotEmpty(serializedRequesters)) {
List<ServiceRequester> requesters =
RequesterService.deserialize(serializedRequesters);
for (ServiceRequester requester : requesters) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 6bfd7de..5290444 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -44,6 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerUtils {
static long NO_SLA = -1L;
+ static String QUOTA_KEY_SEPERATOR = ",";
static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
@@ -220,6 +222,18 @@ public class DagManagerUtils {
return FailureOption.valueOf(failureOption);
}
+ static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
+ return dagNode.getValue().getSpecExecutor().getUri().toString();
+ }
+
+ static String getSerializedRequesterList(DagNode<JobExecutionPlan> dagNode) {
+ return ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
RequesterService.REQUESTER_LIST, null);
+ }
+
+ static String getUserQuotaKey(String user, DagNode<JobExecutionPlan>
dagNode) {
+ return user + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
+ }
+
/**
* Increment the value of {@link JobExecutionPlan#currentAttempts}
*/
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c74314c..60f25d1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -78,7 +78,7 @@ public class DagManagerTest {
this.queue = new LinkedBlockingQueue<>();
this.cancelQueue = new LinkedBlockingQueue<>();
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue,
cancelQueue,
- true);
+ true, 5, new HashMap<>());
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);