[
https://issues.apache.org/jira/browse/GOBBLIN-1996?focusedWorklogId=904339&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-904339
]
ASF GitHub Bot logged work on GOBBLIN-1996:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Feb/24 02:20
Start Date: 09/Feb/24 02:20
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3865:
URL: https://github.com/apache/gobblin/pull/3865#discussion_r1483797872
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java:
##########
@@ -70,7 +70,9 @@ private static Instant getCurrentTime() {
}
public static class Factory {
- private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder().build();
+ private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofHours(24))
Review Comment:
what are the ramifications of this? a job could still run >24 hours right?
does this merely constrain the activity that attempts to send GTEs that it must
finish its work of performing the send within 24 hours?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkunitsInfiniteSource.java:
##########
@@ -0,0 +1,83 @@
+package org.apache.gobblin.temporal.ddm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.eventbus.EventBus;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+/**
+ * An implementation of {@link InfiniteSource} that does not generate any
workunits. This is helpful when the {@link io.temporal.workflow.Workflow}
+ * is driven by the job launcher and not the source. I.e. we want the
discovery to be triggered on a {@link io.temporal.worker.Worker} and not the
+ * {@link org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager}
+ *
+ * This class also implements the {@link InfiniteSource} to provide hooks for
communicating with the
+ * {@link org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher}
via the {@link EventBus}.
+ */
+@Slf4j
+public class NoWorkunitsInfiniteSource implements InfiniteSource {
+
+ private final EventBus eventBus = new
EventBus(this.getClass().getSimpleName());
+
+ public NoWorkunitsInfiniteSource() {
+ }
Review Comment:
FYI, same default ctor should be created for you
Issue Time Tracking
-------------------
Worklog Id: (was: 904339)
Time Spent: 3h 10m (was: 3h)
> Add ability for Yarn app to terminate on finishing of temporal flow
> -------------------------------------------------------------------
>
> Key: GOBBLIN-1996
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1996
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)