This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3822629 [GOBBLIN-1549] Add retries to KafkaJobStatusMonitor for
transient JobStatus state store failure. (#3399)
3822629 is described below
commit 3822629ab4dc3aa6ac7a0d677ced191206d48bf9
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Sep 24 10:16:44 2021 -0700
[GOBBLIN-1549] Add retries to KafkaJobStatusMonitor for transient JobStatus
state store failure. (#3399)
At times, the state store may be unavailable, due to transient connection
failure. Once that resolves, we would like Gobblin Tracking Events to be
processed w/o needing to restart the kafka consumer for that to happen. thus,
we continue retrying state store updates encapsulated within the message, so
that connection repair is automatic (when within the configurable retry window).
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../runtime/KafkaAvroJobStatusMonitorTest.java | 240 ++++++++++++++-------
.../service/monitoring/KafkaJobStatusMonitor.java | 90 ++++++--
.../apache/gobblin/util/retry/RetryerFactory.java | 40 ++--
4 files changed, 257 insertions(+), 115 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8dd68f3..c55b84c 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -882,6 +882,8 @@ public class ConfigurationKeys {
public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP =
"kakfa.source.avgFetchTimeCap";
public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100;
public static final String SHARED_KAFKA_CONFIG_PREFIX =
"gobblin.kafka.sharedConfig";
+ public static final String KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES =
+ "gobblin.kafka.jobStatusMonitor.retry.timeOut.minutes";
/**
* Kafka schema registry HTTP client configuration
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index f0b5d23..ae26d15 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -16,13 +16,21 @@
*/
package org.apache.gobblin.runtime;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.testng.Assert;
@@ -42,6 +50,8 @@ import com.typesafe.config.ConfigValueFactory;
import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.kafka.KafkaTestBase;
@@ -103,29 +113,17 @@ public class KafkaAvroJobStatusMonitorTest {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic1");
//Submit GobblinTrackingEvents to Kafka
- GobblinTrackingEvent event1 = createFlowCompiledEvent();
- context.submitEvent(event1);
- kafkaReporter.report();
-
- GobblinTrackingEvent event2 = createJobOrchestratedEvent(1);
- context.submitEvent(event2);
- kafkaReporter.report();
-
- GobblinTrackingEvent event3 = createJobStartEvent();
- context.submitEvent(event3);
- kafkaReporter.report();
-
- GobblinTrackingEvent event4 = createJobSucceededEvent();
- context.submitEvent(event4);
- kafkaReporter.report();
-
- GobblinTrackingEvent event5 = createDummyEvent();
- context.submitEvent(event5);
- kafkaReporter.report();
-
- GobblinTrackingEvent event6 = createJobStartEvent();
- context.submitEvent(event6);
- kafkaReporter.report();
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobStartEvent(),
+ createJobSucceededEvent(),
+ createDummyEvent(), // note position
+ createJobStartEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
try {
Thread.sleep(1000);
@@ -140,10 +138,11 @@ public class KafkaAvroJobStatusMonitorTest {
MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor("test",config, 1);
jobStatusMonitor.buildMetricsContextAndMetrics();
- ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
- MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
StateStore stateStore = jobStatusMonitor.getStateStore();
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
@@ -153,8 +152,7 @@ public class KafkaAvroJobStatusMonitorTest {
State state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
stateList = stateStore.getAll(storeName, tableName);
@@ -162,29 +160,26 @@ public class KafkaAvroJobStatusMonitorTest {
state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
- messageAndMetadata = iterator.next();
+ // (per above, is a 'dummy' event)
Assert.assertNull(jobStatusMonitor.parseJobStatus(
-
jobStatusMonitor.deserializeEvent(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata))));
+ jobStatusMonitor.deserializeEvent(recordIterator.next())));
// Check that state didn't get set to running since it was already complete
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -199,38 +194,22 @@ public class KafkaAvroJobStatusMonitorTest {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
//Submit GobblinTrackingEvents to Kafka
- GobblinTrackingEvent event1 = createFlowCompiledEvent();
- context.submitEvent(event1);
- kafkaReporter.report();
-
- //set maximum attempt to 2, and current attempt to 1
- GobblinTrackingEvent event2 = createJobOrchestratedEvent(1);
- context.submitEvent(event2);
- kafkaReporter.report();
-
- GobblinTrackingEvent event3 = createJobStartEvent();
- context.submitEvent(event3);
- kafkaReporter.report();
-
- GobblinTrackingEvent event4 = createJobFailedEvent();
- context.submitEvent(event4);
- kafkaReporter.report();
-
- //Mimic retrying - job orchestration
- //set maximum attempt to 2, and current attempt to 2
- GobblinTrackingEvent event5 = createJobOrchestratedEvent(2);
- context.submitEvent(event5);
- kafkaReporter.report();
-
- //Mimic retrying - job start (current attempt = 2)
- GobblinTrackingEvent event6 = createJobStartEvent();
- context.submitEvent(event6);
- kafkaReporter.report();
-
- //Mimic retrying - job failed again (current attempt = 2)
- GobblinTrackingEvent event7 = createJobFailedEvent();
- context.submitEvent(event7);
- kafkaReporter.report();
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobStartEvent(),
+ createJobFailedEvent(),
+ // Mimic retrying - job orchestration
+ // set maximum attempt to 2, and current attempt to 2
+ createJobOrchestratedEvent(2),
+ // Mimic retrying - job start (current attempt = 2)
+ createJobStartEvent(),
+ // Mimic retrying - job failed again (current attempt = 2)
+ createJobFailedEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
try {
Thread.sleep(1000);
@@ -256,7 +235,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
0L);
jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
1L);
- // Test an normal event
+ // Re-test when properly encoded, as expected for a normal event
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
StateStore stateStore = jobStatusMonitor.getStateStore();
@@ -267,8 +246,11 @@ public class KafkaAvroJobStatusMonitorTest {
State state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ iterator,
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ jobStatusMonitor.processMessage(recordIterator.next());
tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
stateList = stateStore.getAll(storeName, tableName);
@@ -276,16 +258,14 @@ public class KafkaAvroJobStatusMonitorTest {
state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -294,8 +274,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -303,8 +282,7 @@ public class KafkaAvroJobStatusMonitorTest {
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -312,8 +290,7 @@ public class KafkaAvroJobStatusMonitorTest {
//Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
- messageAndMetadata = iterator.next();
-
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+ jobStatusMonitor.processMessage(recordIterator.next());
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -325,6 +302,77 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
+ @Test
+ public void testProcessingRetriedForApparentlyTransientErrors() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic3");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1)
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+
+ AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new
AtomicBoolean(false);
+ int minNumFakeExceptionsExpected = 10;
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor(
+ "test", config, 1, shouldThrowFakeExceptionInParseJobStatusToggle);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ shouldThrowFakeExceptionInParseJobStatusToggle.set(true);
+ // since `processMessage` hereafter effectively hangs, launch eventual
re-toggling before calling again
+ ScheduledExecutorService toggleManagementExecutor =
Executors.newScheduledThreadPool(2);
+ toggleManagementExecutor.scheduleAtFixedRate(() -> {
+ if (jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus() >
minNumFakeExceptionsExpected) { // curtail faking: simulate resolution
+ shouldThrowFakeExceptionInParseJobStatusToggle.set(false);
+ }
+ }, 2, 2, TimeUnit.SECONDS);
+ Thread mainThread = Thread.currentThread();
+ // guardrail against excessive retries (befitting this unit test):
+ toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 60, 5,
TimeUnit.SECONDS);
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+
Assert.assertTrue(jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus() >
minNumFakeExceptionsExpected,
+ String.format("processMessage returned with only %d (faked)
exceptions",
+ jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus()));
+
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ toggleManagementExecutor.shutdownNow();
+ jobStatusMonitor.shutDown();
+ }
+
private GobblinTrackingEvent createFlowCompiledEvent() {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
@@ -466,10 +514,23 @@ public class KafkaAvroJobStatusMonitorTest {
}
class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
+ private final AtomicBoolean shouldThrowFakeExceptionInParseJobStatus;
+ @Getter
+ private volatile int numFakeExceptionsFromParseJobStatus = 0;
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads)
throws IOException, ReflectiveOperationException {
+ this(topic, config, numThreads, new AtomicBoolean(false));
+ }
+
+ /**
+ * @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and
retain) to dial whether `parseJobStatus` throws
+ */
+ public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
+ AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle)
+ throws IOException, ReflectiveOperationException {
super(topic, config, numThreads, mock(JobIssueEventHandler.class));
+ shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
}
@Override
@@ -481,5 +542,22 @@ public class KafkaAvroJobStatusMonitorTest {
protected void buildMetricsContextAndMetrics() {
super.buildMetricsContextAndMetrics();
}
+
+ /**
+ * Overridden to stub potential exception within core processing of
`processMessage` (specifically retried portion).
+ * Although truly plausible (IO)Exceptions would originate from
`KafkaJobStatusMonitor.addJobStatusToStateStore`,
+ * that is `static`, so unavailable for override. The approach here is a
pragmatic compromise, being simpler than
+ * the alternative of writing a mock `StateStore.Factory` that the
`KafkaJobStatusMonitor` ctor could reflect,
+ * instantiate, and finally create a mock `StateStore` from.
+ */
+ @Override
+ public org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event) {
+ if (shouldThrowFakeExceptionInParseJobStatus.get()) {
+ int n = ++numFakeExceptionsFromParseJobStatus;
+ throw new RuntimeException(String.format("BOOM! Failure [%d] w/ event
at %d", n, event.getTimestamp()));
+ } else {
+ return super.parseJobStatus(event);
+ }
+ }
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index f6b18f9..fbe21eb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -18,10 +18,13 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -29,6 +32,10 @@ import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -61,6 +68,9 @@ import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.*;
/**
@@ -90,6 +100,10 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
@Getter
private final StateStore<org.apache.gobblin.configuration.State> stateStore;
private final ScheduledExecutorService scheduledExecutorService;
+ private static final Config RETRYER_FALLBACK_CONFIG =
ConfigFactory.parseMap(ImmutableMap.of(
+ RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume
non-transient and give up
+ RETRY_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1L), // back-off to
once/minute
+ RETRY_TYPE, RetryType.EXPONENTIAL.name()));
private static final Config DEFAULTS =
ConfigFactory.parseMap(ImmutableMap.of(
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
@@ -102,6 +116,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final ConcurrentHashMap<String, Long> flowNameGroupToWorkUnitCount;
+ private final Retryer<Void> persistJobStatusRetryer;
+
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
@@ -114,6 +130,24 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.jobIssueEventHandler = jobIssueEventHandler;
this.flowNameGroupToWorkUnitCount = new ConcurrentHashMap<>();
+
+ Config retryerOverridesConfig =
config.hasPath(ConfigurationKeys.KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES)
+ ? ConfigFactory.parseMap(ImmutableMap.of(
+ RETRY_TIME_OUT_MS,
+
config.getDuration(ConfigurationKeys.KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES,
TimeUnit.MINUTES)))
+ : ConfigFactory.empty();
+ // log exceptions to expose errors we suffer under and/or guide
intervention when resolution not readily forthcoming
+ this.persistJobStatusRetryer =
+
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ String msg = String.format("(Likely retryable) failure adding
job status to state store [attempt: %d; %s after start]",
+ attempt.getAttemptNumber(),
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+ log.warn(msg, attempt.getExceptionCause());
+ }
+ }
+ }));
}
@Override
@@ -147,33 +181,49 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
@Override
protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
- try {
- GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+ GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
- if (gobblinTrackingEvent == null) {
- return;
- }
+ if (gobblinTrackingEvent == null) {
+ return;
+ }
- if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
- try (Timer.Context context =
getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
- jobIssueEventHandler.processEvent(gobblinTrackingEvent);
- }
+ if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+ try (Timer.Context context =
getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+ jobIssueEventHandler.processEvent(gobblinTrackingEvent);
}
+ }
- if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
- emitWorkUnitCountMetric(gobblinTrackingEvent);
- return;
- }
+ if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
+ emitWorkUnitCountMetric(gobblinTrackingEvent);
+ return;
+ }
- org.apache.gobblin.configuration.State jobStatus =
parseJobStatus(gobblinTrackingEvent);
- if (jobStatus != null) {
- try(Timer.Context context =
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
- addJobStatusToStateStore(jobStatus, this.stateStore);
+ try {
+ persistJobStatusRetryer.call(() -> {
+ // re-create `jobStatus` on each attempt, since mutated within
`addJobStatusToStateStore`
+ org.apache.gobblin.configuration.State jobStatus =
parseJobStatus(gobblinTrackingEvent);
+ if (jobStatus != null) {
+ try (Timer.Context context =
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
+ addJobStatusToStateStore(jobStatus, this.stateStore);
+ }
}
- }
- } catch (IOException ioe) {
+ return null;
+ });
+ } catch (ExecutionException ee) {
+ String msg = String.format("Failed to add job status to state store for
kafka offset %d", message.getOffset());
+ log.warn(msg, ee.getCause());
+ // Throw RuntimeException to avoid advancing kafka offsets without
updating state store
+ throw new RuntimeException(msg, ee.getCause());
+ } catch (RetryException re) {
+ String interruptedNote = Thread.currentThread().isInterrupted() ? "...
then interrupted" : "";
+ String msg = String.format("Failed to add job status to state store for
kafka offset %d (retried %d times%s)",
+ message.getOffset(), re.getNumberOfFailedAttempts(),
interruptedNote);
+ Throwable informativeException = re.getLastFailedAttempt().hasException()
+ ? re.getLastFailedAttempt().getExceptionCause()
+ : re;
+ log.warn(msg, informativeException);
// Throw RuntimeException to avoid advancing kafka offsets without
updating state store
- throw new RuntimeException("Failed to add job status to state store for
kafka offset " + message.getOffset(), ioe);
+ throw new RuntimeException(msg, informativeException);
}
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index 97b033d..df79d37 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.util.retry;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -24,6 +25,7 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Predicate;
@@ -83,47 +85,57 @@ public class RetryerFactory<T> {
* You can use State along with ConfigBuilder and config prefix to build
config.
*
* @param config
- * @return
+ * @param optRetryListener e.g. for logging failures
*/
- public static <T> Retryer<T> newInstance(Config config) {
+ public static <T> Retryer<T> newInstance(Config config,
Optional<RetryListener> optRetryListener) {
config = config.withFallback(DEFAULTS);
RetryType type =
RetryType.valueOf(config.getString(RETRY_TYPE).toUpperCase());
+ RetryerBuilder<T> builder;
switch (type) {
case EXPONENTIAL:
- return newExponentialRetryer(config);
+ builder = newExponentialRetryerBuilder(config);
+ break;
case FIXED:
- return newFixedRetryer(config);
+ builder = newFixedRetryerBuilder(config);
+ break;
case FIXED_ATTEMPT:
- return newFixedAttemptBoundRetryer(config);
+ builder = newFixedAttemptBoundRetryerBuilder(config);
+ break;
default:
throw new IllegalArgumentException(type + " is not supported");
}
+ optRetryListener.ifPresent(builder::withRetryListener);
+ return builder.build();
+ }
+
+ /**
+ * Creates new instance of retryer based on the config and having no {@link
RetryListener}
+ */
+ public static <T> Retryer<T> newInstance(Config config) {
+ return newInstance(config, Optional.empty());
}
- private static <T> Retryer<T> newFixedRetryer(Config config) {
+ private static <T> RetryerBuilder<T> newFixedRetryerBuilder(Config config) {
return RetryerBuilder.<T> newBuilder()
.retryIfException(RETRY_EXCEPTION_PREDICATE)
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS))
- .build();
+
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS));
}
- private static <T> Retryer<T> newExponentialRetryer(Config config) {
+ private static <T> RetryerBuilder<T> newExponentialRetryerBuilder(Config
config) {
return RetryerBuilder.<T> newBuilder()
.retryIfException(RETRY_EXCEPTION_PREDICATE)
.withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS))
- .build();
+
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS));
}
- private static <T> Retryer<T> newFixedAttemptBoundRetryer(Config config) {
+ private static <T> RetryerBuilder<T>
newFixedAttemptBoundRetryerBuilder(Config config) {
return RetryerBuilder.<T> newBuilder()
.retryIfException(RETRY_EXCEPTION_PREDICATE)
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)))
- .build();
+
.withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)));
}
}