This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0632fbe [GOBBLIN-1527] Refactor flowexecutions endpoint job status
state retrieval
0632fbe is described below
commit 0632fbe770a0074e1888ed8a80de0c8d487061ad
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Aug 30 17:25:27 2021 -0700
[GOBBLIN-1527] Refactor flowexecutions endpoint job status state retrieval
This is merely a refactoring of the job status state retrieval impl. used
by the existing 'flowexecutions' endpoint, to enable reuse by the forthcoming
finder separate commit. Functionality should not change and should continue to
conform to all existing unit tests.
---
.../apache/gobblin/metastore/MysqlStateStore.java | 14 +++---
.../service/monitoring/FlowStatusGenerator.java | 45 ++++++++---------
.../service/monitoring/JobStatusRetriever.java | 19 ++++++--
.../monitoring/MysqlJobStatusRetriever.java | 57 ++++++++++------------
4 files changed, 69 insertions(+), 66 deletions(-)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 3e01344..5c79fa9 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -277,16 +277,15 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
OutputStream os = compressedValues ? new GZIPOutputStream(byteArrayOs)
: byteArrayOs;
DataOutputStream dataOutput = new DataOutputStream(os)) {
- int index = 0;
- insertStatement.setString(++index, storeName);
- insertStatement.setString(++index, tableName);
+ insertStatement.setString(1, storeName);
+ insertStatement.setString(2, tableName);
for (T state : states) {
addStateToDataOutputStream(dataOutput, state);
}
dataOutput.close();
- insertStatement.setBlob(++index, new
ByteArrayInputStream(byteArrayOs.toByteArray()));
+ insertStatement.setBlob(3, new
ByteArrayInputStream(byteArrayOs.toByteArray()));
insertStatement.executeUpdate();
connection.commit();
@@ -299,9 +298,8 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
public T get(String storeName, String tableName, String stateId) throws
IOException {
try (Connection connection = dataSource.getConnection();
PreparedStatement queryStatement =
connection.prepareStatement(SELECT_JOB_STATE_SQL)) {
- int index = 0;
- queryStatement.setString(++index, storeName);
- queryStatement.setString(++index, tableName);
+ queryStatement.setString(1, storeName);
+ queryStatement.setString(2, tableName);
try (ResultSet rs = queryStatement.executeQuery()) {
if (rs.next()) {
@@ -345,12 +343,12 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
queryStatement.setString(1, storeName);
queryStatement.setString(2, tableName);
execGetAllStatement(queryStatement, states);
+ return states;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new IOException("failure retrieving state from storeName " +
storeName + " tableName " + tableName, e);
}
- return states;
}
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 909e856..87173ab 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -24,6 +24,8 @@ import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import javax.inject.Inject;
@@ -94,9 +96,10 @@ public class FlowStatusGenerator {
return matchingFlowStatuses;
}
- String executionStatusFromFlow = getExecutionStatus(flowStatus);
- if (executionStatusFromFlow.equals(executionStatus)) {
- matchingFlowStatuses.add(getFlowStatus(flowName, flowGroup,
flowStatus.getFlowExecutionId(), tag));
+ // defensively materialize, since `getExecutionStatus` advances
`Iterator` arg; else a match would require re-loading from store, re-filtering
by tag
+ List<JobStatus> jobStatuses =
Lists.newArrayList(flowStatus.getJobStatusIterator());
+ if
(getExecutionStatus(jobStatuses.iterator()).equals(executionStatus)) {
+ matchingFlowStatuses.add(new FlowStatus(flowName, flowGroup,
flowStatus.getFlowExecutionId(), jobStatuses.iterator()));
}
}
@@ -105,13 +108,12 @@ public class FlowStatusGenerator {
}
/**
- * Return the executionStatus of the given {@link FlowStatus}. Note that the
{@link FlowStatus#jobStatusIterator}
- * will have it's cursor moved forward by this.
+ * Return the (flow-level) {@link JobStatus} execution status. Note that
the `Iterator` advances internally.
*/
- private String getExecutionStatus(FlowStatus flowStatus) {
- List<JobStatus> jobStatuses =
Lists.newArrayList(flowStatus.getJobStatusIterator());
- jobStatuses =
jobStatuses.stream().filter(JobStatusRetriever::isFlowStatus).collect(Collectors.toList());
- return jobStatuses.isEmpty() ? "" : jobStatuses.get(0).getEventName();
+ private String getExecutionStatus(Iterator<JobStatus> jobStatusIterator) {
+ // lazily elaborate since no need to create collection when only
non-emptiness of interest
+ Iterator<JobStatus> jobFlowStatuses = Iterators.filter(jobStatusIterator,
JobStatusRetriever::isFlowStatus);
+ return jobFlowStatuses.hasNext() ? jobFlowStatuses.next().getEventName() :
"";
}
/**
@@ -124,20 +126,10 @@ public class FlowStatusGenerator {
* list only contains jobs matching the tag.
*/
public FlowStatus getFlowStatus(String flowName, String flowGroup, long
flowExecutionId, String tag) {
- FlowStatus flowStatus = null;
- Iterator<JobStatus> jobStatusIterator =
- jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator =
retainStatusOfAnyFlowOrJobMatchingTag(
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId), tag);
- if (tag != null) {
- jobStatusIterator = Iterators.filter(jobStatusIterator, js ->
JobStatusRetriever.isFlowStatus(js) ||
- (js.getJobTag() != null && js.getJobTag().equals(tag)));
- }
-
- if (jobStatusIterator.hasNext()) {
- flowStatus = new FlowStatus(flowName, flowGroup, flowExecutionId,
jobStatusIterator);
- }
-
- return flowStatus;
+ return jobStatusIterator.hasNext() ? new FlowStatus(flowName, flowGroup,
flowExecutionId, jobStatusIterator) : null;
}
/**
@@ -173,4 +165,13 @@ public class FlowStatusGenerator {
String status = jobStatus.getEventName().toUpperCase();
return !FINISHED_STATUSES.contains(status);
}
+
+ /** @return only `jobStatuses` that represent a flow or, when `tag != null`,
represent a job tagged as `tag` */
+ private Iterator<JobStatus>
retainStatusOfAnyFlowOrJobMatchingTag(Iterator<JobStatus> jobStatuses, String
tag) {
+ Predicate<JobStatus> matchesTag = js ->
JobStatusRetriever.isFlowStatus(js) ||
+ (js.getJobTag() != null && js.getJobTag().equals(tag));
+ Predicate<JobStatus> p = tag == null ? Predicates.alwaysTrue() :
matchesTag;
+
+ return Iterators.filter(jobStatuses, p);
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 6a6c3a2..1d8044d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -80,16 +80,15 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
: getJobStatusesForFlowExecution(flowName, flowGroup,
latestExecutionId);
}
-
/**
*
* @param jobState instance of {@link State}
* @return deserialize {@link State} into a {@link JobStatus}.
*/
protected JobStatus getJobStatus(State jobState) {
- String flowGroup =
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowName =
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- long flowExecutionId =
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ String flowGroup = getFlowGroup(jobState);
+ String flowName = getFlowName(jobState);
+ long flowExecutionId = getFlowExecutionId(jobState);
String jobName =
jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup =
jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String jobTag =
jobState.getProp(TimingEvent.FlowEventConstants.JOB_TAG_FIELD);
@@ -125,6 +124,18 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).issues(issues).build();
}
+ protected final String getFlowGroup(State jobState) {
+ return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ }
+
+ protected final String getFlowName(State jobState) {
+ return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ }
+
+ protected final long getFlowExecutionId(State jobState) {
+ return
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ }
+
public abstract StateStore<State> getStateStore();
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 6fc02ff..c3aac16 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -20,13 +20,10 @@ package org.apache.gobblin.service.monitoring;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
import com.typesafe.config.Config;
import javax.inject.Inject;
@@ -37,7 +34,6 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
@@ -46,6 +42,12 @@ import
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
*/
@Singleton
public class MysqlJobStatusRetriever extends JobStatusRetriever {
+
+ @FunctionalInterface
+ private interface SupplierThrowingIO<T> {
+ T get() throws IOException;
+ }
+
public static final String MYSQL_JOB_STATUS_RETRIEVER_PREFIX =
"mysqlJobStatusRetriever";
public static final String GET_LATEST_JOB_STATUS_METRIC =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestJobStatus");
@@ -67,15 +69,9 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
@Override
public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName,
String flowGroup, long flowExecutionId) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
- try {
- List<State> jobStatusStates;
- try (Timer.Context context =
this.metricContext.contextAwareTimer(GET_LATEST_FLOW_STATUS_METRIC).time()) {
- jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
- }
- return getJobStatuses(jobStatusStates);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ List<State> jobStatusStates = timeOpAndWrapIOException(() ->
this.stateStore.getAll(storeName, flowExecutionId),
+ GET_LATEST_FLOW_STATUS_METRIC);
+ return getJobStatuses(jobStatusStates);
}
@Override
@@ -83,30 +79,22 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
String jobName, String jobGroup) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
String tableName =
KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
- try {
- List<State> jobStatusStates;
- try (Timer.Context context =
this.metricContext.contextAwareTimer(GET_LATEST_JOB_STATUS_METRIC).time()) {
- jobStatusStates = this.stateStore.getAll(storeName, tableName);
- }
- return getJobStatuses(jobStatusStates);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ List<State> jobStatusStates = timeOpAndWrapIOException(() ->
this.stateStore.getAll(storeName, tableName),
+ GET_LATEST_JOB_STATUS_METRIC);
+ return getJobStatuses(jobStatusStates);
}
@Override
public List<Long> getLatestExecutionIdsForFlow(String flowName, String
flowGroup, int count) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ List<State> jobStatusStates = timeOpAndWrapIOException(() ->
this.stateStore.getAll(storeName),
+ GET_ALL_FLOW_STATUSES_METRIC);
+ return getLatestExecutionIds(jobStatusStates, count);
+ }
- try {
- List<State> jobStatusStates;
- try (Timer.Context context =
this.metricContext.contextAwareTimer(GET_ALL_FLOW_STATUSES_METRIC).time()) {
- jobStatusStates = this.stateStore.getAll(storeName);
- }
- List<Long> flowExecutionIds = jobStatusStates.stream().map(state ->
Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
- .collect(Collectors.toList());
- return ImmutableList.copyOf(Iterables.limit(new
TreeSet<>(flowExecutionIds).descendingSet(), count));
+ private List<State> timeOpAndWrapIOException(SupplierThrowingIO<List<State>>
states, String timerMetricName) {
+ try (Timer.Context context =
this.metricContext.contextAwareTimer(timerMetricName).time()) {
+ return states.get();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -115,4 +103,9 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
private Iterator<JobStatus> getJobStatuses(List<State> jobStatusStates) {
return jobStatusStates.stream().map(this::getJobStatus).iterator();
}
+
+ private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int
count) {
+ Iterator<Long> flowExecutionIds =
jobStatusStates.stream().map(this::getFlowExecutionId).iterator();
+ return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
+ }
}