This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ac5d5561c [GOBBLIN-2090] delete deadline triggers in all hosts (#3973)
ac5d5561c is described below
commit ac5d5561c507246f2820256d0ae2f1418ec761c9
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jun 17 11:58:07 2024 -0700
[GOBBLIN-2090] delete deadline triggers in all hosts (#3973)
* delete deadline triggers in all hosts
* fix tests
* add more tests
* address review comment
---
.../apache/gobblin/metastore/MysqlStateStore.java | 6 +-
.../runtime/DagActionStoreChangeMonitorTest.java | 2 +-
...gManagementDagActionStoreChangeMonitorTest.java | 149 ++++++++++++++++
.../runtime/KafkaAvroJobStatusMonitorTest.java | 197 +++++++++++++++------
.../spec_executorInstance/MockedSpecExecutor.java | 21 ++-
.../modules/orchestration/DagActionStore.java | 2 -
.../MostlyMySqlDagManagementStateStore.java | 12 +-
.../modules/orchestration/proc/DagProcUtils.java | 35 +++-
.../orchestration/proc/ReevaluateDagProc.java | 28 +--
.../modules/orchestration/proc/ResumeDagProc.java | 1 +
.../service/modules/spec/JobExecutionPlan.java | 1 +
.../service/monitoring/ChangeMonitorUtils.java | 1 -
.../monitoring/DagActionStoreChangeMonitor.java | 64 ++++---
.../DagManagementDagActionStoreChangeMonitor.java | 44 ++++-
...nagementDagActionStoreChangeMonitorFactory.java | 9 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 28 +--
.../DagManagementTaskStreamImplTest.java | 17 +-
.../orchestration/DagProcessingEngineTest.java | 16 +-
.../MostlyMySqlDagManagementStateStoreTest.java | 52 ++----
.../orchestration/MysqlDagStateStoreTest.java | 10 +-
.../modules/orchestration/OrchestratorTest.java | 15 +-
.../proc/EnforceDeadlineDagProcsTest.java | 8 +-
.../orchestration/proc/KillDagProcTest.java | 16 +-
.../orchestration/proc/LaunchDagProcTest.java | 54 ++++--
.../orchestration/proc/ReevaluateDagProcTest.java | 107 +++++------
.../orchestration/proc/ResumeDagProcTest.java | 46 ++---
26 files changed, 600 insertions(+), 341 deletions(-)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 36ff51a56..aa8d00f5e 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -283,11 +283,7 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
queryStatement.setString(++index, tableName);
try (ResultSet rs = queryStatement.executeQuery()) {
- if (rs.next()) {
- return true;
- } else {
- return false;
- }
+ return rs.next();
}
} catch (SQLException e) {
throw new IOException("Failure checking existence of storeName " +
storeName + " tableName " + tableName, e);
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 2627f55a5..40c930ea4 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -264,7 +264,7 @@ public class DagActionStoreChangeMonitorTest {
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow
identifiers
*/
- private String getKeyForFlow(String flowGroup, String flowName, long
flowExecutionId) {
+ public static String getKeyForFlow(String flowGroup, String flowName, long
flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
new file mode 100644
index 000000000..b93d7147f
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.quartz.SchedulerException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link
DagManagementDagActionStoreChangeMonitor} to process {@link
DagActionStoreChangeEvent} type
+ * events stored in a {@link
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitorTest {
+ public static final String TOPIC =
DagActionStoreChangeEvent.class.getSimpleName();
+ private final int PARTITION = 1;
+ private final int OFFSET = 1;
+ private final String FLOW_GROUP = "flowGroup";
+ private final String FLOW_NAME = "flowName";
+ private final long FLOW_EXECUTION_ID = 123L;
+ private final String JOB_NAME = "jobName";
+ private MockDagManagementDagActionStoreChangeMonitor
mockDagManagementDagActionStoreChangeMonitor;
+ private int txidCounter = 0;
+
+ private static final DagActionReminderScheduler dagActionReminderScheduler =
mock(DagActionReminderScheduler.class);
+
+ /**
+ * Note: The class methods are wrapped in a test specific method because the
original methods are package protected
+ * and cannot be accessed by this class.
+ */
+ static class MockDagManagementDagActionStoreChangeMonitor extends
DagManagementDagActionStoreChangeMonitor {
+
+ public MockDagManagementDagActionStoreChangeMonitor(Config config, int
numThreads, boolean isMultiActiveSchedulerEnabled) {
+ super(config, numThreads, mock(FlowCatalog.class),
mock(Orchestrator.class), mock(DagManagementStateStore.class),
+ isMultiActiveSchedulerEnabled, mock(DagManagement.class),
dagActionReminderScheduler);
+ }
+ protected void processMessageForTest(DecodeableKafkaRecord<String,
DagActionStoreChangeEvent> record) {
+ super.processMessage(record);
+ }
+ }
+
+ MockDagManagementDagActionStoreChangeMonitor
createMockDagManagementDagActionStoreChangeMonitor() {
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ return new MockDagManagementDagActionStoreChangeMonitor(config, 5, true);
+ }
+
+ // Called at start of every test so the count of each method being called is
reset to 0
+ @BeforeMethod
+ public void setupMockMonitor() {
+ mockDagManagementDagActionStoreChangeMonitor =
createMockDagManagementDagActionStoreChangeMonitor();
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any());
+
+ }
+
+ /**
+ * Tests process message with a DELETE type message.
+ */
+ @Test
+ public void testProcessMessageWithDelete() throws SchedulerException {
+ Kafka09ConsumerClient.Kafka09ConsumerRecord<String,
DagActionStoreChangeEvent> consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
DagActionValue.ENFORCE_JOB_START_DEADLINE);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
+ DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(),
times(1))
+ .unscheduleReminderJob(eq(dagAction));
+ }
+
+ /**
+ * Util to create a general DagActionStoreChange type event
+ */
+ private DagActionStoreChangeEvent
createDagActionStoreChangeEvent(OperationType operationType,
+ String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionValue dagAction) {
+ String key = DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup,
flowName, flowExecutionId);
+ GenericStoreChangeEvent genericStoreChangeEvent =
+ new GenericStoreChangeEvent(key, String.valueOf(txidCounter),
System.currentTimeMillis(), operationType);
+ txidCounter++;
+ return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, String.valueOf(flowExecutionId),
+ jobName, dagAction);
+ }
+
+ /**
+ * Util to create wrapper around DagActionStoreChangeEvent
+ */
+ private Kafka09ConsumerClient.Kafka09ConsumerRecord<String,
DagActionStoreChangeEvent> wrapDagActionStoreChangeEvent(
+ OperationType operationType, String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionValue dagAction) {
+ DagActionStoreChangeEvent eventToProcess = null;
+ try {
+ eventToProcess =
+ createDagActionStoreChangeEvent(operationType, flowGroup, flowName,
flowExecutionId, jobName, dagAction);
+ } catch (Exception e) {
+ log.error("Exception while creating event ", e);
+ }
+ // TODO: handle partition and offset values better
+ ConsumerRecord<String, DagActionStoreChangeEvent> consumerRecord = new
ConsumerRecord<>(TOPIC, PARTITION, OFFSET,
+ DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup, flowName,
flowExecutionId), eventToProcess);
+ return new Kafka09ConsumerClient.Kafka09ConsumerRecord<>(consumerRecord);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index b1496e5d4..f03a7ff4b 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -29,9 +29,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -72,9 +72,10 @@ import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueReposi
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
-import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -84,6 +85,9 @@ import
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProduc
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -91,18 +95,17 @@ public class KafkaAvroJobStatusMonitorTest {
public static final String TOPIC =
KafkaAvroJobStatusMonitorTest.class.getSimpleName();
private KafkaTestBase kafkaTestHelper;
- private String flowGroup = "myFlowGroup";
- private String flowName = "myFlowName";
- private String jobGroup = "myJobGroup";
- private String jobName = "myJobName";
- private long flowExecutionId = 1234L;
- private String jobExecutionId = "1111";
- private String message = "https://myServer:8143/1234/1111";
- private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
+ private final String flowGroup = "myFlowGroup";
+ private final String flowName = "myFlowName";
+ private final String jobGroup = "myJobGroup";
+ private final String jobName = "myJobName";
+ private final long flowExecutionId = 1234L;
+ private final String jobExecutionId = "1111";
+ private final String message = "https://myServer:8143/1234/1111";
+ private final String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
- private DagManagementStateStore dagManagementStateStore;
- private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+ DagActionStore.DagAction enforceJobStartDeadlineDagAction;
@BeforeClass
public void setUp() throws Exception {
@@ -112,7 +115,7 @@ public class KafkaAvroJobStatusMonitorTest {
kafkaTestHelper.provisionTopic(TOPIC);
// Create KeyValueProducerPusher instance.
- Pusher pusher = new KafkaKeyValueProducerPusher<byte[],
byte[]>("localhost:dummy", TOPIC,
+ Pusher<Pair<byte[], byte[]>> pusher = new
KafkaKeyValueProducerPusher<>("localhost:dummy", TOPIC,
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
this.kafkaTestHelper.getKafkaServerPort()))));
@@ -121,12 +124,14 @@ public class KafkaAvroJobStatusMonitorTest {
builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
builder =
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
- this.dagManagementStateStore = mock(DagManagementStateStore.class);
- this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
+
+ this.enforceJobStartDeadlineDagAction = new
DagActionStore.DagAction(this.flowGroup, this.flowName, this.flowExecutionId,
this.jobName,
+ DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
}
@Test
public void testProcessMessageForSuccessfulFlow() throws IOException,
ReflectiveOperationException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic1");
//Submit GobblinTrackingEvents to Kafka
@@ -148,24 +153,36 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSJobObservabilityEventProducer());
+ new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
// (per above, is a 'dummy' event)
Assert.assertNull(jobStatusMonitor.parseJobStatus(
@@ -180,6 +197,7 @@ public class KafkaAvroJobStatusMonitorTest {
@Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
public void testProcessMessageForFailedFlow() throws IOException,
ReflectiveOperationException {
+ DagManagementStateStore dagManagementStateStore =
mock(MostlyMySqlDagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
//Submit GobblinTrackingEvents to Kafka
@@ -206,7 +224,8 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(),
dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -215,15 +234,15 @@ public class KafkaAvroJobStatusMonitorTest {
// Verify undecodeable message is skipped
byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
messageAndMetadata.message().length - 1);
- ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC,
messageAndMetadata.partition(),
+ ConsumerRecord<byte[], byte[]> undecodeableRecord = new
ConsumerRecord<>(TOPIC, messageAndMetadata.partition(),
messageAndMetadata.offset(), messageAndMetadata.key(),
undecodeableMessage);
Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
0L);
- jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+ jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord<>(undecodeableRecord));
Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
1L);
// Re-test when properly encoded, as expected for a normal event
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
- StateStore stateStore = jobStatusMonitor.getStateStore();
+ StateStore<State> stateStore = jobStatusMonitor.getStateStore();
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
List<State> stateList = stateStore.getAll(storeName, tableName);
@@ -231,40 +250,59 @@ public class KafkaAvroJobStatusMonitorTest {
State state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
iterator,
this::convertMessageAndMetadataToDecodableKafkaRecord);
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Because the maximum attempt is set to 2, so the state is set to Failed
after trying twice
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
jobStatusMonitor.shutDown();
}
@Test (dependsOnMethods = "testProcessMessageForFailedFlow")
public void testProcessMessageForSkippedFlow() throws IOException,
ReflectiveOperationException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
//Submit GobblinTrackingEvents to Kafka
@@ -284,7 +322,7 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSJobObservabilityEventProducer());
+ new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -293,15 +331,15 @@ public class KafkaAvroJobStatusMonitorTest {
// Verify undecodeable message is skipped
byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
messageAndMetadata.message().length - 1);
- ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC,
messageAndMetadata.partition(),
+ ConsumerRecord<byte[], byte[]> undecodeableRecord = new
ConsumerRecord<>(TOPIC, messageAndMetadata.partition(),
messageAndMetadata.offset(), messageAndMetadata.key(),
undecodeableMessage);
Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
0L);
- jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+ jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord<>(undecodeableRecord));
Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
1L);
// Re-test when properly encoded, as expected for a normal event
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
- StateStore stateStore = jobStatusMonitor.getStateStore();
+ StateStore<State> stateStore = jobStatusMonitor.getStateStore();
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
List<State> stateList = stateStore.getAll(storeName, tableName);
@@ -309,20 +347,27 @@ public class KafkaAvroJobStatusMonitorTest {
State state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
iterator,
this::convertMessageAndMetadataToDecodableKafkaRecord);
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
jobStatusMonitor.shutDown();
}
@Test (dependsOnMethods = "testProcessMessageForSkippedFlow")
public void testProcessingRetriedForApparentlyTransientErrors() throws
IOException, ReflectiveOperationException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic3");
//Submit GobblinTrackingEvents to Kafka
@@ -344,10 +389,10 @@ public class KafkaAvroJobStatusMonitorTest {
Config conf = ConfigFactory.empty().withValue(
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." +
RETRY_MULTIPLIER,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
conf,
- new NoopGaaSJobObservabilityEventProducer());
+ new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
@@ -373,12 +418,17 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus()));
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+
toggleManagementExecutor.shutdownNow();
jobStatusMonitor.shutDown();
}
@Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
//Submit GobblinTrackingEvents to Kafka
@@ -402,43 +452,66 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(),
dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
// Received kill flow event, should not retry the flow even though there
is 1 pending attempt left
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
jobStatusMonitor.shutDown();
}
@Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
public void testProcessMessageForFlowPendingResume() throws IOException,
ReflectiveOperationException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
//Submit GobblinTrackingEvents to Kafka
@@ -461,34 +534,56 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(),
dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
// Job for flow pending resume status after it was cancelled or failed
state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RESUME.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).addJobDagAction(any(), any(),
+ anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
jobStatusMonitor.shutDown();
}
@@ -512,9 +607,9 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSJobObservabilityEventProducer());
+ new NoopGaaSJobObservabilityEventProducer(),
mock(DagManagementStateStore.class));
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
@@ -544,10 +639,10 @@ public class KafkaAvroJobStatusMonitorTest {
MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
issueRepository, false);
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- mockEventProducer);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ ConfigFactory.empty(), mockEventProducer,
mock(DagManagementStateStore.class));
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
@@ -591,10 +686,10 @@ public class KafkaAvroJobStatusMonitorTest {
MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
issueRepository, false);
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- mockEventProducer);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ ConfigFactory.empty(), mockEventProducer,
mock(DagManagementStateStore.class));
jobStatusMonitor.buildMetricsContextAndMetrics();
- Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
@@ -619,10 +714,10 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
- private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor
jobStatusMonitor, Iterator<DecodeableKafkaRecord> recordIterator,
+ private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor
jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>>
recordIterator,
String jobGroup, String jobName) throws IOException {
jobStatusMonitor.processMessage(recordIterator.next());
- StateStore stateStore = jobStatusMonitor.getStateStore();
+ StateStore<State> stateStore = jobStatusMonitor.getStateStore();
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, jobGroup,
jobName);
List<State> stateList = stateStore.getAll(storeName, tableName);
@@ -719,13 +814,15 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
- GaaSJobObservabilityEventProducer eventProducer) throws IOException,
ReflectiveOperationException {
+ GaaSJobObservabilityEventProducer eventProducer, DagManagementStateStore
dagManagementStateStore) throws IOException, ReflectiveOperationException {
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"))
+ .withValue(ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED,
ConfigValueFactory.fromAnyRef("true"))
.withFallback(additionalConfig);
- return new MockKafkaAvroJobStatusMonitor("test",config, 1,
shouldThrowFakeExceptionInParseJobStatusToggle, eventProducer);
+ return new MockKafkaAvroJobStatusMonitor("test", config, 1,
shouldThrowFakeExceptionInParseJobStatusToggle,
+ eventProducer, dagManagementStateStore);
}
/**
* Create a dummy event to test if it is filtered out by the consumer.
@@ -767,13 +864,12 @@ public class KafkaAvroJobStatusMonitorTest {
public void tearDown() {
try {
this.kafkaTestHelper.close();
- this.mockedGobblinServiceManager.close();
} catch(Exception e) {
System.err.println("Failed to close Kafka server.");
}
}
- class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
+ static class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor
{
private final AtomicBoolean shouldThrowFakeExceptionInParseJobStatus;
@Getter
private volatile int numFakeExceptionsFromParseJobStatus = 0;
@@ -782,14 +878,15 @@ public class KafkaAvroJobStatusMonitorTest {
* @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and
retain) to dial whether `parseJobStatus` throws
*/
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
- AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSJobObservabilityEventProducer producer)
+ AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSJobObservabilityEventProducer producer,
+ DagManagementStateStore dagManagementStateStore)
throws IOException, ReflectiveOperationException {
super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer, dagManagementStateStore);
shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
}
@Override
- protected void processMessage(DecodeableKafkaRecord record) {
+ protected void processMessage(DecodeableKafkaRecord<byte[], byte[]>
record) {
super.processMessage(record);
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 77d21c14d..7647b60e2 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -24,23 +24,28 @@ import java.util.concurrent.Future;
import org.mockito.Mockito;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+
+import lombok.EqualsAndHashCode;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
+@EqualsAndHashCode(of = "config", callSuper=false)
public class MockedSpecExecutor extends InMemorySpecExecutor {
- private SpecProducer<Spec> mockedSpecProducer;
+ private final SpecProducer<Spec> mockedSpecProducer;
+ private final Config config;
public MockedSpecExecutor(Config config) {
super(config);
+ this.config = config;
this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
when(mockedSpecProducer.addSpec(any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
@@ -49,9 +54,17 @@ public class MockedSpecExecutor extends InMemorySpecExecutor
{
}
public static SpecExecutor createDummySpecExecutor(URI uri) {
+ return new
MockedSpecExecutor(makeDummyConfigsForSpecExecutor(uri.toString()));
+ }
+
+ public static Config makeDummyConfigsForSpecExecutor(String specUriInString)
{
+ String specStoreDir = "/tmp/specStoreDir";
Properties properties = new Properties();
- properties.setProperty(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
uri.toString());
- return new MockedSpecExecutor(ConfigFactory.parseProperties(properties));
+ properties.put("specStore.fs.dir", specStoreDir);
+ properties.put("specExecInstance.capabilities", "source:destination");
+ properties.put(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
specUriInString);
+ properties.put("uri", specUriInString);
+ return ConfigUtils.propertiesToConfig(properties);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 3e0735bd3..2d22aa3a2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -31,14 +31,12 @@ import
org.apache.gobblin.service.modules.flowgraph.DagNodeId;
public interface DagActionStore {
public static final String NO_JOB_NAME_DEFAULT = "";
enum DagActionType {
- CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
ENFORCE_JOB_START_DEADLINE, // Enforce job start deadline
ENFORCE_FLOW_FINISH_DEADLINE, // Enforce flow finish deadline
KILL, // Kill invoked through API call
LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final
job status
RESUME, // Resume flow invoked through API call
- RETRY, // Invoked through DagManager for flows configured to allow retries
}
@Data
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 862c73f96..82504c729 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -74,7 +74,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
private final UserQuotaManager quotaManager;
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
- private static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
+ public static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
FlowCatalog flowCatalog;
@Getter
@@ -194,8 +194,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan>
dagNode, DagManager.DagId dagId)
throws IOException {
- Optional<Dag<JobExecutionPlan>> dag = getDag(dagId);
- if (!dag.isPresent()) {
+ if (!containsDag(dagId)) {
throw new RuntimeException("Dag " + dagId + " not found");
}
this.dagNodes.put(dagNode.getValue().getId(), dagNode);
@@ -264,6 +263,13 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
}
}
+ /* todo - this method works because when the jobs finish they are deleted
from the DMSS -> if no more job is found, means
+ no more running jobs.
+ But DMSS still has dags and which still contains dag nodes. We need to
revisit this method's logic when we change
+ DMSS to a fully mysql backed implementation. then we may want to consider
this approach
+ return getDagNodes(dagId).stream()
+ .anyMatch(node ->
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
+ */
@Override
public boolean hasRunningJobs(DagManager.DagId dagId) {
return !getDagNodes(dagId).isEmpty();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index b52f361dc..b11bb4c1c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -45,6 +45,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -160,7 +161,7 @@ public class DagProcUtils {
try {
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
- Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
sendCancellationEvent(dagNodeToCancel.getValue());
@@ -210,4 +211,36 @@ public class DagProcUtils {
return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config,
DagManager.JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
}
+
+ public static boolean isJobLevelStatus(String jobName) {
+ return !jobName.equals(JobStatusRetriever.NA_KEY);
+ }
+
+ public static void
removeEnforceJobStartDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, String flowGroup,
+ String flowName, long flowExecutionId, String jobName) {
+ DagActionStore.DagAction enforceJobStartDeadlineDagAction = new
DagActionStore.DagAction(flowGroup, flowName,
+ flowExecutionId, jobName,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+ log.info("Deleting dag action {}", enforceJobStartDeadlineDagAction);
+ // todo - add metrics
+
+ try {
+
dagManagementStateStore.deleteDagAction(enforceJobStartDeadlineDagAction);
+ } catch (IOException e) {
+ log.warn("Failed to delete dag action {}",
enforceJobStartDeadlineDagAction);
+ }
+ }
+
+ public static void removeFlowFinishDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, DagManager.DagId dagId) {
+ DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(dagId.getFlowGroup(),
+ dagId.getFlowName(), dagId.getFlowExecutionId(),
+ DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+ log.info("Deleting dag action {}", enforceFlowFinishDeadlineDagAction);
+ // todo - add metrics
+
+ try {
+
dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction);
+ } catch (IOException e) {
+ log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
+ }
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 242290656..a57de4818 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -21,17 +21,13 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;
-import org.quartz.SchedulerException;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
-import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -83,7 +79,8 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
ExecutionStatus executionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
- setStatus(dagManagementStateStore, dagNode, executionStatus);
+ // pass dag, so that dag is updated too, updated information will be
required in onJobFinish in finding next jobs to submit
+ setStatus(dagManagementStateStore, dag, getDagNodeId(), executionStatus);
if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
@@ -120,7 +117,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dagManagementStateStore.markDagFailed(dag);
}
- removeFlowFinishDeadlineTriggerAndDagAction(dagManagementStateStore);
+ DagProcUtils.removeFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagId());
}
}
@@ -129,9 +126,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
* todo - DMSS should support this functionality like an atomic get-and-set
operation.
*/
private void setStatus(DagManagementStateStore dagManagementStateStore,
- Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus)
throws IOException {
- Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
- DagNodeId dagNodeId = dagNode.getValue().getId();
+ Dag<JobExecutionPlan> dag, DagNodeId dagNodeId, ExecutionStatus
executionStatus) throws IOException {
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
if (node.getValue().getId().equals(dagNodeId)) {
node.getValue().setExecutionStatus(executionStatus);
@@ -177,19 +172,4 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dagManagementStateStore.checkpointDag(dag);
dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
}
-
- private void
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore) {
- DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
- getDagNodeId().getFlowName(), getDagNodeId().getFlowExecutionId(),
- DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
- log.info("Deleting reminder trigger and dag action {}",
enforceFlowFinishDeadlineDagAction);
- // todo - add metrics
-
- try {
-
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction());
-
dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction);
- } catch (SchedulerException | IOException e) {
- log.warn("Failed to unschedule the reminder for {}",
enforceFlowFinishDeadlineDagAction);
- }
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index ddc100ae8..68e02494e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -88,6 +88,7 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
// these two statements effectively move the dag from failed dag store to
(running) dag store.
// to prevent loss in the unlikely event of failure between the two, we
add first.
dagManagementStateStore.checkpointDag(failedDag.get());
+
// if it fails here, it will check point the failed dag in the (running)
dag store again, which is idempotent
dagManagementStateStore.deleteFailedDag(failedDag.get());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 72e545104..561ee2345 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
*/
@Data
@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts",
"jobFuture", "flowStartTime"})
+// todo - consider excluding SpecExecutor from EqualsAndHashCode or only
including DagNodeId
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
public static final String JOB_PROPS_KEY = "job.props";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index 9e121c8b7..c74b3ffe0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -26,7 +26,6 @@ import org.apache.gobblin.metrics.ContextAwareMeter;
@Slf4j
public final class ChangeMonitorUtils {
private ChangeMonitorUtils() {
- return;
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 5706ad9cc..a179811a4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -59,7 +59,7 @@ import
org.apache.gobblin.service.modules.orchestration.Orchestrator;
* connector between the API and execution layers of GaaS.
*/
@Slf4j
-public class DagActionStoreChangeMonitor extends HighLevelConsumer {
+public class DagActionStoreChangeMonitor extends HighLevelConsumer<String,
DagActionStoreChangeEvent> {
public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX =
"dagActionChangeStore";
// Metrics
@@ -70,13 +70,13 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
private ContextAwareMeter successfulLaunchSubmissionsOnStartup;
private ContextAwareMeter failedLaunchSubmissionsOnStartup;
protected ContextAwareMeter unexpectedErrors;
- private ContextAwareMeter messageProcessedMeter;
- private ContextAwareMeter duplicateMessagesMeter;
- private ContextAwareMeter heartbeatMessagesMeter;
- private ContextAwareMeter nullDagActionTypeMessagesMeter;
+ protected ContextAwareMeter messageProcessedMeter;
+ protected ContextAwareMeter duplicateMessagesMeter;
+ protected ContextAwareMeter heartbeatMessagesMeter;
+ protected ContextAwareMeter nullDagActionTypeMessagesMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
- private volatile Long produceToConsumeDelayValue = -1L;
+ protected volatile Long produceToConsumeDelayValue = -1L;
protected LaunchSubmissionMetricProxy ON_STARTUP = new
NullLaunchSubmissionMetricProxy();
protected LaunchSubmissionMetricProxy POST_STARTUP = new
NullLaunchSubmissionMetricProxy();
@@ -188,11 +188,11 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
This class is multithreaded and this method will be called by multiple
threads, however any given message will be
partitioned and processed by only one thread (and corresponding queue).
*/
- protected void processMessage(DecodeableKafkaRecord message) {
+ protected void processMessage(DecodeableKafkaRecord<String,
DagActionStoreChangeEvent> message) {
// This will also include the heathCheck message so that we can rely on
this to monitor the health of this Monitor
messageProcessedMeter.mark();
- String key = (String) message.getKey();
- DagActionStoreChangeEvent value = (DagActionStoreChangeEvent)
message.getValue();
+ String key = message.getKey();
+ DagActionStoreChangeEvent value = message.getValue();
String tid = value.getChangeEventIdentifier().getTxId();
Long produceTimestamp =
value.getChangeEventIdentifier().getProduceTimestampMillis();
@@ -224,31 +224,41 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType);
+ handleDagAction(operation, dagAction, flowGroup, flowName,
flowExecutionId, dagActionType);
+
+ dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
+ }
+
+ protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup, String flowName,
+ long flowExecutionId, DagActionStore.DagActionType dagActionType) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
- if (operation.equals("INSERT")) {
- handleDagAction(dagAction, false);
- } else if (operation.equals("UPDATE")) {
- // TODO: change this warning message and process updates if for launch
or reevaluate type
- log.warn("Received an UPDATE action to the DagActionStore when values
in this store are never supposed to be "
- + "updated. Flow group: {} name {} executionId {} were updated to
action {}", flowGroup, flowName,
- flowExecutionId, dagActionType);
- this.unexpectedErrors.mark();
- } else if (operation.equals("DELETE")) {
- log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
- } else {
- log.warn("Received unsupported change type of operation {}. Expected
values to be in [INSERT, UPDATE, DELETE]",
- operation);
- this.unexpectedErrors.mark();
- return;
+ switch (operation) {
+ case "INSERT":
+ handleDagAction(dagAction, false);
+ break;
+ case "UPDATE":
+ // TODO: change this warning message and process updates if for
launch or reevaluate type
+ log.warn("Received an UPDATE action to the DagActionStore when
values in this store are never supposed to be "
+ + "updated. Flow group: {} name {} executionId {} were
updated to action {}", flowGroup, flowName,
+ flowExecutionId, dagActionType);
+ this.unexpectedErrors.mark();
+ break;
+ case "DELETE":
+ log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
+ break;
+ default:
+ log.warn(
+ "Received unsupported change type of operation {}. Expected
values to be in [INSERT, UPDATE, DELETE]",
+ operation);
+ this.unexpectedErrors.mark();
+ break;
}
} catch (Exception e) {
- log.warn("Ran into unexpected error processing DagActionStore changes:
{}", e);
+ log.warn("Ran into unexpected error processing DagActionStore changes:
", e);
this.unexpectedErrors.mark();
}
-
- dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index ab863974c..a411ae1d0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -19,11 +19,14 @@ package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -37,16 +40,55 @@ import
org.apache.gobblin.service.modules.orchestration.Orchestrator;
@Slf4j
public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChangeMonitor {
private final DagManagement dagManagement;
+ @VisibleForTesting @Getter
+ private final DagActionReminderScheduler dagActionReminderScheduler;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagManagementDagActionStoreChangeMonitor(Config config, int
numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
- boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+ boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
DagActionReminderScheduler dagActionReminderScheduler) {
// DagManager is only needed in the `handleDagAction` method of its parent
class and not needed in this class,
// so we are passing a null value for DagManager to its parent class.
super("", config, null, numThreads, flowCatalog, orchestrator,
dagManagementStateStore, isMultiActiveSchedulerEnabled);
this.dagManagement = dagManagement;
+ this.dagActionReminderScheduler = dagActionReminderScheduler;
+ }
+
+ @Override
+ protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup, String flowName,
+ long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+ // We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
+ // {@link DagActionStore.FlowActionType} flow requests that have to be
processed.
+ try {
+ switch (operation) {
+ case "INSERT":
+ handleDagAction(dagAction, false);
+ break;
+ case "UPDATE":
+ log.warn("Received an UPDATE action to the DagActionStore when
values in this store are never supposed to be "
+ + "updated. Flow group: {} name {} executionId {} were
updated to action {}", flowGroup, flowName,
+ flowExecutionId, dagActionType);
+ this.unexpectedErrors.mark();
+ break;
+ case "DELETE":
+ log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
+ if (dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE
+ || dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+ this.dagActionReminderScheduler.unscheduleReminderJob(dagAction);
+ }
+ break;
+ default:
+ log.warn(
+ "Received unsupported change type of operation {}. Expected
values to be in [INSERT, UPDATE, DELETE]",
+ operation);
+ this.unexpectedErrors.mark();
+ break;
+ }
+ } catch (Exception e) {
+ log.warn("Ran into unexpected error processing DagActionStore changes:
", e);
+ this.unexpectedErrors.mark();
+ }
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index c5828575d..a55286e17 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -48,17 +49,20 @@ public class
DagManagementDagActionStoreChangeMonitorFactory implements Provider
private final DagManagementStateStore dagManagementStateStore;
private final boolean isMultiActiveSchedulerEnabled;
private final DagManagement dagManagement;
+ private final DagActionReminderScheduler dagActionReminderScheduler;
@Inject
public DagManagementDagActionStoreChangeMonitorFactory(Config config,
DagManager dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagManagementStateStore
dagManagementStateStore, DagManagement dagManagement,
- @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
+ @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled,
+ DagActionReminderScheduler dagActionReminderScheduler) {
this.config = Objects.requireNonNull(config);
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
this.dagManagement = dagManagement;
+ this.dagActionReminderScheduler = dagActionReminderScheduler;
}
private DagManagementDagActionStoreChangeMonitor
createDagActionStoreMonitor() {
@@ -68,7 +72,8 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
return new
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
- numThreads, flowCatalog, orchestrator, dagManagementStateStore,
isMultiActiveSchedulerEnabled, this.dagManagement);
+ numThreads, flowCatalog, orchestrator, dagManagementStateStore,
isMultiActiveSchedulerEnabled, this.dagManagement,
+ this.dagActionReminderScheduler);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 7caec9a4c..9b7e40695 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import org.quartz.SchedulerException;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
@@ -64,10 +63,9 @@ import
org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
-import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.retry.RetryerFactory;
@@ -223,17 +221,22 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+ String status =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
if (updatedJobStatus.getRight() == NewState.FINISHED) {
this.eventProducer.emitObservabilityEvent(jobStatus);
}
- if (this.dagProcEngineEnabled) {
+ if (this.dagProcEngineEnabled &&
DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
// todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
} else if (updatedJobStatus.getRight() == NewState.RUNNING) {
- removeStartDeadlineTriggerAndDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
+
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
+ }
+ // in case, the job is cancelled before it started, we need to
clean it's enforceJobStartDeadlineDagAction
+ if (status != null &&
ExecutionStatus.valueOf(status).equals(ExecutionStatus.CANCELLED)) {
+
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
}
}
@@ -260,21 +263,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
- private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore, String flowGroup,
- String flowName, long flowExecutionId, String jobName) {
- DagActionStore.DagAction enforceStartDeadlineDagAction = new
DagActionStore.DagAction(flowGroup, flowName,
- flowExecutionId, jobName,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
- log.info("Deleting reminder trigger and dag action {}",
enforceStartDeadlineDagAction);
- // todo - add metrics
-
- try {
-
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
- dagManagementStateStore.deleteDagAction(enforceStartDeadlineDagAction);
- } catch (SchedulerException | IOException e) {
- log.error("Failed to unschedule the reminder for {}",
enforceStartDeadlineDagAction);
- }
- }
-
/**
* It fills missing fields in job status and also merge the fields with the
existing job status in the state store.
* Merging is required because we do not want to lose the information sent
by other GobblinTrackingEvents.
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 205f02dbc..046366b24 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -18,9 +18,6 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
import org.junit.Assert;
@@ -34,7 +31,6 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -43,6 +39,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -61,22 +58,14 @@ public class DagManagementTaskStreamImplTest {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
this.testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
Config config = configBuilder.build();
- // Constructing TopologySpecMap.
- Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
- String specExecInstance = "mySpecExecutor";
- TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
- URI specExecURI = new URI(specExecInstance);
- topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config,
- null, null, null, mock(DagActionStore.class));
- dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index b3a924454..14dd338f6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -18,9 +18,6 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
@@ -39,7 +36,6 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
@@ -73,22 +69,14 @@ public class DagProcessingEngineTest {
Config config;
ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
config = configBuilder.build();
- // Constructing TopologySpecMap.
- Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
- String specExecInstance = "mySpecExecutor";
- TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
- URI specExecURI = new URI(specExecInstance);
- topologySpecMap.put(specExecURI, topologySpec);
- dagManagementStateStore = new MostlyMySqlDagManagementStateStore(config,
null,
- null, null, dagActionStore);
- dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
doReturn(true).when(dagActionStore).deleteDagAction(any());
dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 970309cef..aaabf6a86 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -29,19 +29,16 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
-import com.zaxxer.hikari.HikariDataSource;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.MysqlStateStore;
-import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -62,7 +59,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
- private static final String TEST_TABLE = "quotas";
+ private static final String TEST_TABLE = "table";
+ public static String TEST_SPEC_EXECUTOR_URI = "mySpecExecutor";
@BeforeClass
public void setUp() throws Exception {
@@ -118,11 +116,11 @@ public class MostlyMySqlDagManagementStateStoreTest {
public static MostlyMySqlDagManagementStateStore
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
TestMysqlDagStateStore.class.getName())
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testMetastoreDatabase.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE +
1)
+
.addPrimitive(MostlyMySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+ + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE +
2);
Config config = configBuilder.build();
JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
JobStatus dummyJobStatus =
JobStatus.builder().flowName("fn").flowGroup("fg").jobGroup("fg").jobName("job0")
@@ -132,41 +130,13 @@ public class MostlyMySqlDagManagementStateStoreTest {
// Constructing TopologySpecMap.
Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
- String specExecInstance = "mySpecExecutor";
- TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
- URI specExecURI = new URI(specExecInstance);
+
+ TopologySpec topologySpec =
LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
+ URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
topologySpecMap.put(specExecURI, topologySpec);
MostlyMySqlDagManagementStateStore dagManagementStateStore =
new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever, mock(DagActionStore.class));
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
-
- /**
- * Only overwrite {@link #createStateStore(Config)} method to directly
return a mysqlStateStore
- * backed by mocked db.
- */
- public static class TestMysqlDagStateStore extends MysqlDagStateStore {
- public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
- super(config, topologySpecMap);
- }
-
- @Override
- protected StateStore<State> createStateStore(Config config) {
- try {
- String jdbcUrl =
config.getString(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
- HikariDataSource dataSource = new HikariDataSource();
-
-
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
- dataSource.setAutoCommit(false);
- dataSource.setJdbcUrl(jdbcUrl);
- dataSource.setUsername(TEST_USER);
- dataSource.setPassword(TEST_PASSWORD);
-
- return new MysqlStateStore<>(dataSource, TEST_DAG_STATE_STORE, false,
State.class);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index 59816742a..41a092c58 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -161,7 +162,7 @@ public class MysqlDagStateStoreTest {
* Only overwrite {@link #createStateStore(Config)} method to directly
return a mysqlStateStore
* backed by mocked db.
*/
- public class TestMysqlDagStateStore extends MysqlDagStateStore {
+ public static class TestMysqlDagStateStore extends MysqlDagStateStore {
public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
super(config, topologySpecMap);
}
@@ -170,7 +171,10 @@ public class MysqlDagStateStoreTest {
protected StateStore<State> createStateStore(Config config) {
try {
// Setting up mock DB
- String jdbcUrl = MysqlDagStateStoreTest.testDb.getJdbcUrl();
+ String jdbcUrl =
config.hasPath(ConfigurationKeys.STATE_STORE_DB_URL_KEY)
+ ? config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY)
+ : MysqlDagStateStoreTest.testDb.getJdbcUrl();
+ String tableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_DAG_STATE_STORE);
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
@@ -179,7 +183,7 @@ public class MysqlDagStateStoreTest {
dataSource.setUsername(TEST_USER);
dataSource.setPassword(TEST_PASSWORD);
- return new MysqlStateStore<>(dataSource, TEST_DAG_STATE_STORE, false,
State.class);
+ return new MysqlStateStore<>(dataSource, tableName, false,
State.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index dc2a1b40f..a32a211fc 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -41,8 +41,8 @@ import com.google.common.base.Optional;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
@@ -67,6 +67,7 @@ import org.apache.gobblin.util.PathUtils;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
public class OrchestratorTest {
@@ -130,20 +131,12 @@ public class OrchestratorTest {
this.mockDagManager = mock(DagManager.class);
Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
- Config config = ConfigBuilder.create()
-
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
-
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
this.testMetastoreDatabase.getJdbcUrl())
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE).build();
-
MostlyMySqlDagManagementStateStore dagManagementStateStore =
- new MostlyMySqlDagManagementStateStore(config, null, null, null,
mock(DagActionStore.class));
+
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
- FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+ FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(ConfigFactory.empty(),
sharedFlowMetricsSingleton, mock(UserQuotaManager.class),
mockFlowStatusGenerator);
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger),
mockFlowStatusGenerator,
Optional.absent(), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 0fb261515..ff05be247 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -86,7 +86,9 @@ public class EnforceDeadlineDagProcsTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
- .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
@@ -122,7 +124,9 @@ public class EnforceDeadlineDagProcsTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
message("Test
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index 0976f6c59..64efb2ed5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -37,7 +37,6 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
@@ -56,7 +55,6 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -70,9 +68,7 @@ public class KillDagProcTest {
public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
-
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
- doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
- doNothing().when(this.dagManagementStateStore).addDagNodeState(any(),
any());
+ LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@AfterClass(alwaysRun = true)
@@ -87,7 +83,10 @@ public class KillDagProcTest {
public void killDag() throws IOException, URISyntaxException,
InterruptedException {
long flowExecutionId = System.currentTimeMillis();
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
- 5, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg")));
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg"))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
@@ -125,7 +124,10 @@ public class KillDagProcTest {
public void killDagNode() throws IOException, URISyntaxException,
InterruptedException {
long flowExecutionId = System.currentTimeMillis();
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
- 5, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg")));
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg"))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
JobStatus
jobStatus =
JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 1e7580e7f..e57e09589 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -22,10 +22,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -42,14 +41,16 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -75,9 +76,7 @@ public class LaunchDagProcTest {
public void setUp() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
-
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
- doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
- doNothing().when(this.dagManagementStateStore).addDagNodeState(any(),
any());
+ mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@AfterClass(alwaysRun = true)
@@ -87,17 +86,18 @@ public class LaunchDagProcTest {
}
@Test
- public void launchDag() throws IOException, InterruptedException,
URISyntaxException, ExecutionException {
+ public void launchDag() throws IOException, InterruptedException,
URISyntaxException {
String flowGroup = "fg";
String flowName = "fn";
long flowExecutionId = 12345L;
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5",
ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
- .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
- SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(0));
List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
LaunchDagProc launchDagProc = new LaunchDagProc(
new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0",
@@ -107,15 +107,10 @@ public class LaunchDagProcTest {
launchDagProc.process(this.dagManagementStateStore);
int numOfLaunchedJobs = 1; // = number of start nodes
- long addSpecCount = specProducers.stream()
- .mapToLong(p -> Mockito.mockingDetails(p)
- .getInvocations()
- .stream()
- .filter(a -> a.getMethod().getName().equals("addSpec"))
- .count())
- .sum();
- Mockito.verify(specProducer,
Mockito.times(numOfLaunchedJobs)).addSpec(any());
- Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+ specProducers.stream().skip(numOfLaunchedJobs) // separately verified
`specProducers.get(0)`
+ .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
.addFlowDagAction(any(), any(), anyLong(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
@@ -129,7 +124,9 @@ public class LaunchDagProcTest {
Dag<JobExecutionPlan> dag =
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5",
ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
- .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(
@@ -180,4 +177,23 @@ public class LaunchDagProcTest {
}
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
+
+ public static void mockDMSSCommonBehavior(DagManagementStateStore
dagManagementStateStore) throws IOException, SpecNotFoundException {
+
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+ }
+
+ public static TopologySpec buildNaiveTopologySpec(String specUriInString) {
+ Config specExecConfig =
MockedSpecExecutor.makeDummyConfigsForSpecExecutor(specUriInString);
+ SpecExecutor specExecutorInstanceProducer = new
MockedSpecExecutor(specExecConfig);
+ TopologySpec.Builder topologySpecBuilder = TopologySpec
+ .builder(new
Path(specExecConfig.getString("specStore.fs.dir")).toUri())
+ .withConfig(specExecConfig)
+ .withDescription("test")
+ .withVersion("1")
+ .withSpecExecutor(specExecutorInstanceProducer);
+
+ return topologySpecBuilder.build();
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 057788173..bbf5874db 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.orchestration.proc;
-import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -38,14 +37,11 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
-import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -59,20 +55,16 @@ import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
public class ReevaluateDagProcTest {
private final long flowExecutionId = System.currentTimeMillis();
private final String flowGroup = "fg";
-
private ITestMetastoreDatabase testMetastoreDatabase;
private DagManagementStateStore dagManagementStateStore;
private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
- private DagActionReminderScheduler dagActionReminderScheduler;
@BeforeClass
public void setUpClass() throws Exception {
@@ -83,17 +75,7 @@ public class ReevaluateDagProcTest {
@BeforeMethod
public void setUp() throws Exception {
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
- mockDMSSCommonBehavior(dagManagementStateStore);
- this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class);
- this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler);
- }
-
- private void mockDMSSCommonBehavior(DagManagementStateStore
dagManagementStateStore) throws IOException, SpecNotFoundException {
-
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
- doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
- doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
- doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
- doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+ LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
}
@AfterClass(alwaysRun = true)
@@ -106,45 +88,43 @@ public class ReevaluateDagProcTest {
@Test
public void testOneNextJobToRun() throws Exception {
String flowName = "fn";
+ DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName,
flowExecutionId);
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
2, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
- JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
- message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
+ .flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
+
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ dagManagementStateStore.checkpointDag(dag);
- doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
- doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ /*
+ We cannot check the spec producers if addSpec is called on them because
spec producer object changes in writing/reading
+ from DMSS. So, in order to verify job submission by checking the mock
invocations on spec producers, we will have to
+ fix the dag, which includes SpecProducer, by mocking, when we read a dag
from DMSS
+ */
+ doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus)))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ doReturn(Optional.of(dag)). when(dagManagementStateStore).getDag(dagId);
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
- long addSpecCount = specProducers.stream()
- .mapToLong(p -> Mockito.mockingDetails(p)
- .getInvocations()
- .stream()
- .filter(a -> a.getMethod().getName().equals("addSpec"))
- .count())
- .sum();
-
// next job is sent to spec producer
- Assert.assertEquals(addSpecCount, 1L);
+ Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
+ // there are two invocations, one after setting status and other after
sending new job to specProducer
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(2)).addDagNodeState(any(), any());
// current job's state is deleted
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
-
-
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
-
- // when there is no more job to run in re-evaluate dag proc, it deletes
enforce_flow_finish_dag_action also
-
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
// test when there does not exist a next job in the dag when the current
job's reevaluate dag action is processed
@@ -156,31 +136,36 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
- JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
- message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
+ .flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
+
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ dagManagementStateStore.checkpointDag(dag);
- doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
- doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+ Dag<JobExecutionPlan> mockedDag = DagManagerTest.buildDag("2",
flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+ // mock getDagNodeWithJobStatus() to return a dagNode with status completed
+
mockedDag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+ doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)),
Optional.of(jobStatus)))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
- long addSpecCount = specProducers.stream()
- .mapToLong(p -> Mockito.mockingDetails(p)
- .getInvocations()
- .stream()
- .filter(a -> a.getMethod().getName().equals("addSpec"))
- .count())
- .sum();
-
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
// no new job to launch for this one job flow
- Assert.assertEquals(addSpecCount, 0L);
+ specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
// current job's state is deleted
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
@@ -190,9 +175,6 @@ public class ReevaluateDagProcTest {
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
-
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
-
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
@@ -205,12 +187,12 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
- List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
-
- doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
- doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)),
Optional.empty()))
+ dagManagementStateStore.checkpointDag(dag);
+ doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.empty()))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
ReevaluateDagProc
@@ -231,7 +213,6 @@ public class ReevaluateDagProcTest {
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
- Mockito.verify(dagActionReminderScheduler,
Mockito.never()).unscheduleReminderJob(any());
}
@Test
@@ -242,12 +223,14 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
.jobName("job3").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ dagManagementStateStore.checkpointDag(dag);
- doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus)))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index c68bde24e..c84132c8a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -19,10 +19,7 @@ package
org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
@@ -35,15 +32,11 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
@@ -51,8 +44,6 @@ import
org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -64,9 +55,7 @@ public class ResumeDagProcTest {
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
-
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
- doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
- doNothing().when(this.dagManagementStateStore).addDagNodeState(any(),
any());
+ LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@AfterClass(alwaysRun = true)
@@ -80,38 +69,41 @@ public class ResumeDagProcTest {
This test creates a failed dag and launches a resume dag proc for it. It
then verifies that the next jobs are set to run.
*/
@Test
- public void resumeDag()
- throws IOException, URISyntaxException, ExecutionException,
InterruptedException {
+ public void resumeDag() throws IOException, URISyntaxException,
ExecutionException, InterruptedException {
long flowExecutionId = 12345L;
String flowGroup = "fg";
String flowName = "fn";
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
- 5, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
- .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
// simulate a failed dag in store
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
+ this.dagManagementStateStore.checkpointDag(dag);
+ // simulate it as a failed dag
+ this.dagManagementStateStore.markDagFailed(dag);
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),
null, this.dagManagementStateStore));
resumeDagProc.process(this.dagManagementStateStore);
- SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
- List<SpecProducer<Spec>> otherSpecProducers =
dag.getNodes().stream().map(node -> {
- try {
- return DagManagerUtils.getSpecProducer(node);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }).filter(sp -> specProducer != sp).collect(Collectors.toList());
int expectedNumOfResumedJobs = 1; // = number of resumed nodes
- Mockito.verify(specProducer,
Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
+ /* only the current job should have run
+ we cannot check the spec producers if addSpec is called on them, because
in resumeDagProc, a dag is stored in DMSS
+ and retrieved, hence it goes through serialization/deserialization; in
this process the SpecProducer objects in
+ a dag node are recreated and addSpec is called on new objects.
+ warning - in unit tests, a test dag is created through different ways
(e.g. DagManagerTest.buildDag() or DagTestUtils.buildDag()
+ different methods create different spec executors (e.g.
MockedSpecExecutor.createDummySpecExecutor() or
+ buildNaiveTopologySpec().getSpecExecutor() respectively.
+ the result will be that after serializing/deserializing the test dag, the
spec executor (and producer) type may change */
+
Mockito.verify(this.dagManagementStateStore,
Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
- otherSpecProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
}
}