arjun4084346 commented on code in PR #3817:
URL: https://github.com/apache/gobblin/pull/3817#discussion_r1379405036
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -0,0 +1,154 @@
+package org.apache.gobblin.runtime;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link DagActionStoreChangeMonitor} to
process {@link DagActionStoreChangeEvent} type
+ * events stored in a {@link
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitorTest {
+ public static final String TOPIC =
DagActionStoreChangeEvent.class.getSimpleName();
+ private final int PARTITION = 1;
+ private final int OFFSET = 1;
+ private final String FLOW_GROUP = "flowGroup";
+ private final String FLOW_NAME = "flowName";
+ private final String FLOW_EXECUTION_ID = "123";
+ private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
+ private int txidCounter = 0;
+
+ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {
+
+ public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads,
+ boolean isMultiActiveSchedulerEnabled) {
+ super(topic, config, mock(DagActionStore.class), mock(DagManager.class),
numThreads, mock(FlowCatalog.class),
+ mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord record) {
+ super.processMessage(record);
+ }
+
+ @Override
+ protected void startUp() {
+ super.startUp();
+ }
+ }
+
+ MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() {
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true);
+ }
+
+ @BeforeClass
+ public void setup() {
+ mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
+ mockDagActionStoreChangeMonitor.startUp();
+ }
+
+ /**
+ * Ensure no NPE results from passing a HEARTBEAT type message with a null
{@link DagActionValue}
+ */
+ @Test
+ public void testProcessMessageWithHeartbeat() {
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "",
null);
+ mockDagActionStoreChangeMonitor.processMessage(consumerRecord);
+ }
+
+ /**
+ * Tests process message with an INSERT type message
+ */
+ @Test
+ public void testProcessMessageWithInsert() {
+ Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+ wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+ mockDagActionStoreChangeMonitor.processMessage(consumerRecord);
Review Comment:
Do we just need to test that `processMessage` returns without any errors, or
can we verify any other stuff too? I havent looked what all `processMessage` do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]