This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5339332d9 Add framework and unit tests for DagActionStoreChangeMonitor
(#3817)
5339332d9 is described below
commit 5339332d9c0d0651c7a1d3f8c07ea96a1bc6d4ad
Author: umustafi <[email protected]>
AuthorDate: Wed Nov 8 10:06:01 2023 -0800
Add framework and unit tests for DagActionStoreChangeMonitor (#3817)
* Add framework and unit tests for DagActionStoreChangeMonitor
* Add more test cases and validation
* Add header for new file
* Move FlowSpec static function to Utils class
* Remove unused import
* Fix compile error
* Fix unit tests
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/DagActionStoreChangeMonitorTest.java | 237 +++++++++++++++++++++
.../org/apache/gobblin/runtime/api/FlowSpec.java | 18 +-
.../apache/gobblin/runtime/api/FlowSpecTest.java | 4 +-
.../monitoring/DagActionStoreChangeMonitor.java | 11 +-
4 files changed, 254 insertions(+), 16 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
new file mode 100644
index 000000000..4025cf0db
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.net.URI;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link DagActionStoreChangeMonitor} to
process {@link DagActionStoreChangeEvent} type
+ * events stored in a {@link
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitorTest {
+ public static final String TOPIC =
DagActionStoreChangeEvent.class.getSimpleName();
+ private final int PARTITION = 1;
+ private final int OFFSET = 1;
+ private final String FLOW_GROUP = "flowGroup";
+ private final String FLOW_NAME = "flowName";
+ private final String FLOW_EXECUTION_ID = "123";
+ private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
+ private int txidCounter = 0;
+
+ /**
+ * Note: The class methods are wrapped in a test specific method because the
original methods are package protected
+ * and cannot be accessed by this class.
+ */
+ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {
+
+ public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads,
+ boolean isMultiActiveSchedulerEnabled) {
+ super(topic, config, mock(DagActionStore.class), mock(DagManager.class),
numThreads, mock(FlowCatalog.class),
+ mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
+ }
+
+ protected void processMessageForTest(DecodeableKafkaRecord record) {
+ super.processMessage(record);
+
+ }
+
+ protected void startUpForTest() {
+ super.startUp();
+ }
+ }
+
+ MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() {
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true);
+ }
+
+ // Called at start of every test so the count of each method being called is
reset to 0
+ public void setup() {
+ mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
+ mockDagActionStoreChangeMonitor.startUpForTest();
+ }
+
+ /**
+ * Ensure no NPE results from passing a HEARTBEAT type message with a null
{@link DagActionValue} and the message is
+ * filtered out since it's a heartbeat type so no methods are called.
+ */
+ @Test
+ public void testProcessMessageWithHeartbeatAndNullDagAction() throws
SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "",
null);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ // Note: Indirectly verifies submitFlowToDagManagerHelper is called which
is not a mocked object so cannot be asserted
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Ensure a HEARTBEAT type message with non-empty flow information is
filtered out since it's a heartbeat type so no
+ * methods are called.
+ */
+ @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
+ public void testProcessMessageWithHeartbeatAndFlowInfo() throws
SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Tests process message with an INSERT type message of a `launch` action
+ */
+ @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo")
+ public void testProcessMessageWithInsertLaunchType() throws
SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(1)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Tests process message with an INSERT type message of a `resume` action.
It re-uses the same flow information however
+ * since it is a different tid used every time it will be considered unique
and submit a kill request.
+ */
+ @Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType")
+ public void testProcessMessageWithInsertResumeType() throws
SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Tests process message with an INSERT type message of a `kill` action.
Similar to `testProcessMessageWithInsertResumeType`.
+ */
+ @Test (dependsOnMethods = "testProcessMessageWithInsertResumeType")
+ public void testProcessMessageWithInsertKillType() throws
SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Tests process message with an UPDATE type message of the 'launch' action
above. Although processMessage does not
+ * expect this message type it should handle it gracefully
+ */
+ @Test (dependsOnMethods = "testProcessMessageWithInsertKillType")
+ public void testProcessMessageWithUpdate() throws SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Tests process message with a DELETE type message which should be ignored
regardless of the flow information.
+ */
+ @Test (dependsOnMethods = "testProcessMessageWithUpdate")
+ public void testProcessMessageWithDelete() throws SpecNotFoundException {
+ setup();
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+ mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+ verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
+ }
+
+ /**
+ * Util to create a general DagActionStoreChange type event
+ */
+ private DagActionStoreChangeEvent
createDagActionStoreChangeEvent(OperationType operationType,
+ String flowGroup, String flowName, String flowExecutionId,
DagActionValue dagAction) {
+ String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
+ GenericStoreChangeEvent genericStoreChangeEvent =
+ new GenericStoreChangeEvent(key, String.valueOf(txidCounter),
System.currentTimeMillis(), operationType);
+ txidCounter++;
+ return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, flowExecutionId, dagAction);
+ }
+
+ /**
+ * Form a key for events using the flow identifiers
+ * @return a key formed by adding an '_' delimiter between the flow
identifiers
+ */
+ private String getKeyForFlow(String flowGroup, String flowName, String
flowExecutionId) {
+ return flowGroup + "_" + flowName + "_" + flowExecutionId;
+ }
+
+ /**
+ * Util to create wrapper around DagActionStoreChangeEvent
+ */
+ private Kafka09ConsumerClient.Kafka09ConsumerRecord
wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup,
String flowName,
+ String flowExecutionId, DagActionValue dagAction) {
+ DagActionStoreChangeEvent eventToProcess = null;
+ try {
+ eventToProcess =
+ createDagActionStoreChangeEvent(operationType, flowGroup, flowName,
flowExecutionId, dagAction);
+ } catch (Exception e) {
+ log.error("Exception while creating event ", e);
+ }
+ // TODO: handle partition and offset values better
+ ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION,
OFFSET,
+ getKeyForFlow(flowGroup, flowName, flowExecutionId), eventToProcess);
+ return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 2df547596..7b887f472 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -518,15 +518,15 @@ public class FlowSpec implements Configurable, Spec {
return URI_SCHEME.length() + ":".length() // URI separator
+ URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH
+ URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
}
- }
- /**
- * Create a new FlowSpec object with the added property defined by path and
value parameters
- * @param path key for new property
- * @param value
- */
- public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String
path, String value) {
- Config updatedConfig = flowSpec.getConfig().withValue(path,
ConfigValueFactory.fromAnyRef(value));
- return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
+ /**
+ * Create a new FlowSpec object with the added property defined by path
and value parameters
+ * @param path key for new property
+ * @param value
+ */
+ public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec,
String path, String value) {
+ Config updatedConfig = flowSpec.getConfig().withValue(path,
ConfigValueFactory.fromAnyRef(value));
+ return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
+ }
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
index 793abd222..6ef5ca1cc 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
@@ -26,8 +26,6 @@ import org.apache.gobblin.service.FlowId;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.apache.gobblin.runtime.api.FlowSpec.*;
-
public class FlowSpecTest {
@@ -51,7 +49,7 @@ public class FlowSpecTest {
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"true");
FlowSpec originalFlowSpec =
FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
- FlowSpec updatedFlowSpec = createFlowSpecWithProperty(originalFlowSpec,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+ FlowSpec updatedFlowSpec =
FlowSpec.Utils.createFlowSpecWithProperty(originalFlowSpec,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
Properties updatedProperties = updatedFlowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
flowExecutionId);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index c8d9c62b7..bbe1e47ab 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.monitoring;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -27,6 +28,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
@@ -42,8 +44,6 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
-import static org.apache.gobblin.runtime.api.FlowSpec.*;
-
/**
* A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent}
schema to process Kafka messages received
@@ -79,10 +79,13 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10,
TimeUnit.MINUTES).build(cacheLoader);
protected DagActionStore dagActionStore;
-
+ @Getter
+ @VisibleForTesting
protected DagManager dagManager;
protected Orchestrator orchestrator;
protected boolean isMultiActiveSchedulerEnabled;
+ @Getter
+ @VisibleForTesting
protected FlowCatalog flowCatalog;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
@@ -200,7 +203,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Adds flowExecutionId to config to ensure they are consistent across
hosts
- FlowSpec updatedSpec = createFlowSpecWithProperty(spec,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+ FlowSpec updatedSpec = FlowSpec.Utils.createFlowSpecWithProperty(spec,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
this.orchestrator.submitFlowToDagManager(updatedSpec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}",
flowId, e.getMessage());