arjun4084346 commented on code in PR #3817:
URL: https://github.com/apache/gobblin/pull/3817#discussion_r1379391670
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -0,0 +1,154 @@
+package org.apache.gobblin.runtime;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link DagActionStoreChangeMonitor} to
process {@link DagActionStoreChangeEvent} type
+ * events stored in a {@link
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitorTest {
+ public static final String TOPIC =
DagActionStoreChangeEvent.class.getSimpleName();
+ private final int PARTITION = 1;
+ private final int OFFSET = 1;
+ private final String FLOW_GROUP = "flowGroup";
+ private final String FLOW_NAME = "flowName";
+ private final String FLOW_EXECUTION_ID = "123";
+ private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
+ private int txidCounter = 0;
+
+ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {
+
+ public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads,
+ boolean isMultiActiveSchedulerEnabled) {
+ super(topic, config, mock(DagActionStore.class), mock(DagManager.class),
numThreads, mock(FlowCatalog.class),
+ mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord record) {
+ super.processMessage(record);
Review Comment:
Why to override when all the method do is call super's method? same for the
`startUp`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]