This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new da6b1dfd0 [GOBBLIN-1941] Develop Temporal abstractions, including
`Workload` for workflows of unbounded size through sub-workflow nesting (#3811)
da6b1dfd0 is described below
commit da6b1dfd02400b6aafe5a650a6fb8d9388cdac0f
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Oct 30 16:22:58 2023 -0700
[GOBBLIN-1941] Develop Temporal abstractions, including `Workload` for
workflows of unbounded size through sub-workflow nesting (#3811)
* Define `Workload` abstraction for Temporal workflows of unbounded size
through sub-workflow nesting
* Adjust Gobblin-Temporal configurability for consistency and abstraction
* Define `WorkerConfig`, to pass the `TemporalWorker`'s configuration to
the workflows and activities it hosts
* Improve javadoc
* Javadoc fixup
* Minor changes
* Update per review suggestions
* Insert pause, to spread the load on the temporal server, before launch of
each child workflow that may have direct leaves of its own
* Appease findbugs by having `SeqSliceBackedWorkSpan::next` throw
`NoSuchElementException`
* Add comment
---
.../temporal/GobblinTemporalConfigurationKeys.java | 9 +-
.../temporal/cluster/AbstractTemporalWorker.java | 26 +++-
.../cluster/GobblinTemporalTaskRunner.java | 12 +-
.../gobblin/temporal/cluster/TemporalWorker.java | 28 ++++
.../gobblin/temporal/cluster/WorkerConfig.java | 72 +++++++++
.../joblauncher/GobblinTemporalJobScheduler.java | 2 +-
.../util/nesting/work/SeqBackedWorkSpan.java | 70 +++++++++
.../util/nesting/work/SeqSliceBackedWorkSpan.java | 75 ++++++++++
.../temporal/util/nesting/work/WorkflowAddr.java | 65 ++++++++
.../temporal/util/nesting/work/Workload.java | 57 +++++++
.../workflow/AbstractNestingExecWorkflowImpl.java | 164 +++++++++++++++++++++
.../util/nesting/workflow/NestingExecWorkflow.java | 55 +++++++
12 files changed, 619 insertions(+), 16 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 63e9adc38..f238ffc9b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -30,15 +30,18 @@ public interface GobblinTemporalConfigurationKeys {
String PREFIX = "gobblin.temporal.";
- String WORKER_CLASS = PREFIX + "worker";
+ String WORKER_CLASS = PREFIX + "worker.class";
String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
- String GOBBLIN_TEMPORAL_JOB_LAUNCHER = PREFIX + "job.launcher";
- String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER =
HelloWorldJobLauncher.class.getName();
+ String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher.";
+ String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class";
+ String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS =
HelloWorldJobLauncher.class.getName();
+
+ String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
/**
* Number of worker processes to spin up per task runner
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
index 856257f28..0ed6652eb 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
@@ -17,17 +17,21 @@
package org.apache.gobblin.temporal.cluster;
+import java.util.Arrays;
+
import com.typesafe.config.Config;
import io.temporal.client.WorkflowClient;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
+import io.temporal.worker.WorkerOptions;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
-public abstract class AbstractTemporalWorker {
+/** Basic boilerplate for a {@link TemporalWorker} to register its activity
and workflow capabilities and listen on a particular queue */
+public abstract class AbstractTemporalWorker implements TemporalWorker {
private final WorkflowClient workflowClient;
private final String queueName;
private final WorkerFactory workerFactory;
@@ -42,10 +46,13 @@ public abstract class AbstractTemporalWorker {
// Create a Worker factory that can be used to create Workers that
poll specific Task Queues.
workerFactory = WorkerFactory.newInstance(workflowClient);
+
+ stashWorkerConfig(cfg);
}
+ @Override
public void start() {
- Worker worker = workerFactory.newWorker(queueName);
+ Worker worker = workerFactory.newWorker(queueName,
createWorkerOptions());
// This Worker hosts both Workflow and Activity implementations.
// Workflows are stateful, so you need to supply a type to create
instances.
worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
@@ -55,16 +62,25 @@ public abstract class AbstractTemporalWorker {
workerFactory.start();
}
- /**
- * Shuts down the worker.
- */
+ @Override
public void shutdown() {
workerFactory.shutdown();
}
+ protected WorkerOptions createWorkerOptions() {
+ return null;
+ }
+
/** @return workflow types for *implementation* classes (not interface) */
protected abstract Class<?>[] getWorkflowImplClasses();
/** @return activity instances; NOTE: activities must be stateless and
thread-safe, so a shared instance is used. */
protected abstract Object[] getActivityImplInstances();
+
+ private final void stashWorkerConfig(Config cfg) {
+ // stash in association with...
+ WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
+ Arrays.stream(getWorkflowImplClasses()).forEach(clazz ->
WorkerConfig.withImpl(clazz, cfg)); // its workflow impls
+ Arrays.stream(getActivityImplInstances()).forEach(obj ->
WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index 79cb0baf7..61545f6ed 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -124,7 +124,7 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
protected final String temporalQueueName;
private final boolean isMetricReportingFailureFatal;
private final boolean isEventReportingFailureFatal;
- private final List<AbstractTemporalWorker> workers;
+ private final List<TemporalWorker> workers;
public GobblinTemporalTaskRunner(String applicationName,
String applicationId,
@@ -234,7 +234,7 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
}
}
- private AbstractTemporalWorker initiateWorker() throws Exception{
+ private TemporalWorker initiateWorker() throws Exception {
logger.info("Starting Temporal Worker");
String connectionUri =
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
@@ -246,8 +246,8 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
- AbstractTemporalWorker worker =
GobblinConstructorUtils.invokeLongestConstructor(
- (Class<AbstractTemporalWorker>) Class.forName(workerClassName),
clusterConfig, client);
+ TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
+ (Class<TemporalWorker>)Class.forName(workerClassName), clusterConfig,
client);
worker.start();
logger.info("A new worker is started.");
return worker;
@@ -286,9 +286,7 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
this.containerMetrics.get().stopMetricsReporting();
}
- for (AbstractTemporalWorker worker : workers) {
- worker.shutdown();
- }
+ workers.forEach(TemporalWorker::shutdown);
logger.info("All services are stopped.");
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java
new file mode 100644
index 000000000..5474f6421
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+/** Marker interface for a temporal.io "worker", with capability to `start()`
and `shutdown()` */
+public interface TemporalWorker {
+
+ /** Starts the worker */
+ void start();
+
+ /** Shuts down the worker */
+ void shutdown();
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java
new file mode 100644
index 000000000..ea581df25
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * Static holder to stash the {@link Config} used to construct each kind of
{@link org.apache.gobblin.temporal.cluster.TemporalWorker}
+ * (within the current JVM). Lookup may be by either the {@link Class} of the
worker or of any workflow or activity implementation supplied by
+ * that worker. The objective is to facilitate sharing the worker's Config
with workflow and activity implementations (running within that worker).
+ *
+ * ATTENTION: for sanity, construct multiple instances of the same worker
always with the same {@link Config}. When this is violated, the `Config`
+ * given to the most-recently constructed worker "wins".
+ *
+ * NOTE: the preservation and sharing of {@link Config} is predicated entirely
on its immutability. Thank you TypeSafe!
+ * Storage indexing uses FQ class name, not the {@link Class}, to be
independent of classloader.
+ */
+@Slf4j
+public class WorkerConfig {
+ private static final ConcurrentHashMap<String, Config> configByFQClassName =
new ConcurrentHashMap<>();
+
+ private WorkerConfig() {}
+
+ /** @return whether initialized now (vs. being previously known) */
+ public static boolean forWorker(Class<? extends TemporalWorker> workerClass,
Config config) {
+ return storeAs(workerClass.getName(), config);
+ }
+
+ /** @return whether initialized now (vs. being previously known) */
+ public static boolean withImpl(Class<?> workflowOrActivityImplClass, Config
config) {
+ return storeAs(workflowOrActivityImplClass.getName(), config);
+ }
+
+ public static Optional<Config> ofWorker(Class<? extends TemporalWorker>
workerClass) {
+ return Optional.ofNullable(configByFQClassName.get(workerClass.getName()));
+ }
+
+ public static Optional<Config> ofImpl(Class<?> workflowOrActivityImplClass) {
+ return
Optional.ofNullable(configByFQClassName.get(workflowOrActivityImplClass.getName()));
+ }
+
+ public static Optional<Config> of(Object workflowOrActivityImpl) {
+ return ofImpl(workflowOrActivityImpl.getClass());
+ }
+
+ private static boolean storeAs(String className, Config config) {
+ Config prior = configByFQClassName.put(className, config);
+ log.info("storing config of {} values as '{}'{}",
config.entrySet().size(), className, prior == null ? " (new)" : "");
+ return prior == null;
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index 1b1ee14a4..d93e2a670 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -180,7 +180,7 @@ public class GobblinTemporalJobScheduler extends
JobScheduler implements Standar
Class<? extends GobblinTemporalJobLauncher> jobLauncherClass =
(Class<? extends GobblinTemporalJobLauncher>)
Class.forName(combinedProps.getProperty(
- GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER));
+ GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS));
return GobblinConstructorUtils.invokeLongestConstructor(jobLauncherClass,
combinedProps,
this.appWorkDir,
this.metadataTags,
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java
new file mode 100644
index 000000000..51f67b859
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.Iterator;
+import java.util.List;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an
in-memory collection */
+@NoArgsConstructor
+@RequiredArgsConstructor
+public class SeqBackedWorkSpan<WORK_ITEM> implements
Workload.WorkSpan<WORK_ITEM> {
+
+ @NonNull
+ private List<WORK_ITEM> elems;
+ // CAUTION: despite the "warning: @NonNull is meaningless on a primitive
@lombok.RequiredArgsConstructor"...
+ // if removed, no two-arg ctor is generated, so syntax error on `new
CollectionBackedTaskSpan(elems, startIndex)`
+ @NonNull
+ private int startingIndex;
+ private transient Iterator<WORK_ITEM> statefulDelegatee = null;
+
+ @Override
+ public int getNumElems() {
+ return elems.size();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (statefulDelegatee == null) {
+ statefulDelegatee = elems.iterator();
+ }
+ return statefulDelegatee.hasNext();
+ }
+
+ @Override
+ public WORK_ITEM next() {
+ if (statefulDelegatee == null) {
+ throw new IllegalStateException("first call `hasNext()`!");
+ }
+ return statefulDelegatee.next();
+ }
+
+ @Override
+ public String toString() {
+ return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems()
+ "})";
+ }
+
+ protected String getClassNickname() {
+ // return getClass().getSimpleName();
+ return "WorkSpan";
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java
new file mode 100644
index 000000000..8b649ac51
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.NoSuchElementException;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+
+/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an
in-memory collection, *SHARED* w/ other work spans */
+@NoArgsConstructor
+@RequiredArgsConstructor
+public class SeqSliceBackedWorkSpan<WORK_ITEM> implements
Workload.WorkSpan<WORK_ITEM> {
+ private static final int NOT_SET_SENTINEL = -1;
+
+ @NonNull private WORK_ITEM[] sharedElems;
+ // CAUTION: despite the "warning: @NonNull is meaningless on a primitive
@lombok.RequiredArgsConstructor"...
+ // if removed, no two-arg ctor is generated, so syntax error on `new
CollectionSliceBackedTaskSpan(elems, startIndex)`
+ @NonNull private int startingIndex;
+ @NonNull private int numElements;
+ private transient volatile int nextElemIndex = NOT_SET_SENTINEL;
+
+ @Override
+ public int getNumElems() {
+ return getEndingIndex() - startingIndex;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextElemIndex == NOT_SET_SENTINEL) {
+ nextElemIndex = startingIndex; // NOTE: `startingIndex` should be
effectively `final` (post-deser) and always >= 0
+ }
+ return nextElemIndex < this.getEndingIndex();
+ }
+
+ @Override
+ public WORK_ITEM next() {
+ if (nextElemIndex >= startingIndex + numElements) {
+ throw new NoSuchElementException("index " + nextElemIndex + " >= " +
startingIndex + " + " + numElements);
+ }
+ return sharedElems[nextElemIndex++];
+ }
+
+ @Override
+ public String toString() {
+ return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems()
+ "})";
+ }
+
+ protected String getClassNickname() {
+ // return getClass().getSimpleName();
+ return "WorkSpan";
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ protected final int getEndingIndex() {
+ return Math.min(startingIndex + numElements, sharedElems.length);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
new file mode 100644
index 000000000..0329d90d9
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Hierarchical address for nesting workflows (0-based). */
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkflowAddr {
+ public static final String SEP = ".";
+
+ /** initial, top-level address */
+ public static final WorkflowAddr ROOT = new WorkflowAddr(0);
+
+ @Getter
+ @NonNull // IMPORTANT: for jackson (de)serialization (which won't permit
`final`)
+ private List<Integer> segments;
+
+ public WorkflowAddr(final int firstLevelOnly) {
+ this(Lists.newArrayList(firstLevelOnly));
+ }
+
+ /** @return 0-based depth */
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public int getDepth() {
+ return segments.size() - 1;
+ }
+
+ /** Create a child of the current `WFAddr` */
+ public WorkflowAddr createChild(int childLevel) {
+ final List<Integer> copy = new ArrayList<>(segments);
+ copy.add(childLevel);
+ return new WorkflowAddr(copy);
+ }
+
+ @Override
+ public String toString() {
+ return Joiner.on(SEP).join(segments);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
new file mode 100644
index 000000000..239825f7c
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import java.util.Iterator;
+import java.util.Optional;
+
+
+/**
+ * {@link Workload} models a logical collection of homogenous inputs over
which a "foreach" operation can asynchronously apply
+ * an arbitrary procedure to each element. This encapsulates "processing" the
entire collection of sequential
+ * "work item" specifications by the uniform application of the chosen
procedure(s).
+ *
+ * Given Temporal's required determinism, the work items and work spans should
remain unchanged, with stable sequential
+ * ordering. This need not constrain `Workload`s to eager, advance
elaboration: "streaming" definition is possible,
+ * so long as producing a deterministic result.
+ *
+ * A actual, real-world workload might correspond to datastore contents, such
as records serialized into HDFS files
+ * or ordered DB query results.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY,
property = "@class") // to handle impls
+public interface Workload<WORK_ITEM> {
+
+ /**
+ * @return a sequential sub-sequence, from `startIndex` (0-based), unless it
falls beyond the underlying sequence
+ * NOTE: this is a blocking call that forces elaboration:
`WorkSpan.getNumElems() < numElements` signifies end of seq
+ */
+ Optional<WorkSpan<WORK_ITEM>> getSpan(int startIndex, int numElements);
+
+ /** Non-blocking, best-effort advice: to support non-strict elaboration,
does NOT guarantee `index` will not exceed */
+ boolean isIndexKnownToExceed(int index);
+
+ default boolean isDefiniteSize() {
+ return false;
+ }
+
+ /** Logical sub-sequence 'slice' of contiguous work items */
+ public interface WorkSpan<T> extends Iterator<T> {
+ int getNumElems();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
new file mode 100644
index 000000000..0dcf19a77
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.workflow;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.utils.Lists;
+
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only
define {@link #launchAsyncActivity} */
+@Slf4j
+public abstract class AbstractNestingExecWorkflowImpl<WORK_ITEM,
ACTIVITY_RESULT> implements NestingExecWorkflow<WORK_ITEM> {
+ public static final int
NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT = 10;
+ public static final int
MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100;
+
+ @Override
+ public int performWorkload(
+ final WorkflowAddr addr,
+ final Workload<WORK_ITEM> workload,
+ final int startIndex,
+ final int maxBranchesPerTree,
+ final int maxSubTreesPerTree,
+ final Optional<Integer> maxSubTreesForCurrentTreeOverride
+ ) {
+ final int maxSubTreesForCurrent =
maxSubTreesForCurrentTreeOverride.orElse(maxSubTreesPerTree);
+ final int maxLeaves = maxBranchesPerTree - maxSubTreesForCurrent;
+ final Optional<Workload.WorkSpan<WORK_ITEM>> optSpan =
workload.getSpan(startIndex, maxLeaves);
+ log.info("[" + addr + "] " + workload + " w/ start '" + startIndex + "'"
+ + "; tree (" + maxBranchesPerTree + "/" + maxSubTreesPerTree + "): " +
optSpan);
+ if (!optSpan.isPresent()) {
+ return 0;
+ } else {
+ final Workload.WorkSpan<WORK_ITEM> workSpan = optSpan.get();
+ final Iterable<WORK_ITEM> iterable = () -> workSpan;
+ final List<Promise<ACTIVITY_RESULT>> childActivities =
StreamSupport.stream(iterable.spliterator(), false)
+ .map(t -> launchAsyncActivity(t))
+ .collect(Collectors.toList());
+ final List<Promise<Integer>> childSubTrees = new ArrayList<>();
+ if (workSpan.getNumElems() == maxLeaves) { // received as many as
requested (did not stop short)
+ int subTreeId = 0;
+ for (int subTreeChildMaxSubTreesPerTree
+ : consolidateSubTreeGrandChildren(maxSubTreesForCurrent,
maxBranchesPerTree, maxSubTreesPerTree)) {
+ // CAUTION: calc these *before* incrementing `subTreeId`!
+ final int childStartIndex = startIndex + maxLeaves +
(maxBranchesPerTree * subTreeId);
+ final int nextChildId = maxLeaves + subTreeId;
+ final WorkflowAddr childAddr = addr.createChild(nextChildId);
+ final NestingExecWorkflow<WORK_ITEM> child =
createChildWorkflow(childAddr);
+ if (!workload.isIndexKnownToExceed(childStartIndex)) { //
best-effort short-circuiting
+ // IMPORTANT: insert pause before launch of each child workflow
that may have direct leaves of its own. periodic pauses spread the load on the
+ // temporal server, to avoid a sustained burst from submitting
potentially very many async activities over the full hierarchical elaboration
+ final int numDirectLeavesChildMayHave = maxBranchesPerTree -
subTreeChildMaxSubTreesPerTree;
+ if (numDirectLeavesChildMayHave > 0) {
+
Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave));
+ }
+ childSubTrees.add(
+ Async.function(child::performWorkload, childAddr, workload,
childStartIndex, maxBranchesPerTree,
+ maxSubTreesPerTree,
Optional.of(subTreeChildMaxSubTreesPerTree)));
+ ++subTreeId;
+ }
+ }
+ }
+ final Promise<Void> allActivityChildren = Promise.allOf(childActivities);
+ allActivityChildren.get(); // ensure all complete prior to counting them
in `overallActivitiesRollupCount`
+ // TODO: determine whether any benefit to unordered `::get` blocking for
any next ready (perhaps no difference...)
+ final int descendantActivitiesRollupCount =
childSubTrees.stream().map(Promise::get).reduce(0, (x, y) -> x + y);
+ // TODO: consider a generalized reduce op for things other than counting!
+ final int overallActivitiesRollupCount = workSpan.getNumElems() +
descendantActivitiesRollupCount;
+ log.info("[" + addr + "] activites finished coordinating: " +
overallActivitiesRollupCount);
+ return overallActivitiesRollupCount;
+ }
+ }
+
+ /** Factory for invoking the specific activity by providing it args via
{@link Async::function} */
+ protected abstract Promise<ACTIVITY_RESULT> launchAsyncActivity(WORK_ITEM
task);
+
+ protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final
WorkflowAddr childAddr) {
+ // preserve the current workflow ID of this parent, but add the
(hierarchical) address extension specific to each child
+ String thisWorkflowId = Workflow.getInfo().getWorkflowId();
+ String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" +
childAddr;
+ ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+ .setWorkflowId(childWorkflowId)
+ .build();
+ return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+ }
+
+ /** @return how long to pause prior to creating a child workflow, based on
`numDirectLeavesChildMayHave` */
+ protected Duration calcPauseDurationBeforeCreatingSubTree(int
numDirectLeavesChildMayHave) {
+ // (only pause when an appreciable number of leaves)
+ // TODO: use a configuration value, for simpler adjustment, rather than
hard-code
+ return numDirectLeavesChildMayHave >
MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
+ ?
Duration.ofSeconds(NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT)
+ : Duration.ZERO;
+ }
+
+ /**
+ * "right-tilt" sub-tree's grandchildren, so final child gets all
grandchildren (vs. constant grandchildren/child)
+ * i.e. NOT!:
+ * List<Integer> naiveUniformity =
Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren);
+ * @return each sub-tree's desired size, in ascending sub-tree order
+ */
+ protected static List<Integer> consolidateSubTreeGrandChildren(
+ final int numSubTreesPerSubTree,
+ final int numChildrenTotal,
+ final int numSubTreeChildren
+ ) {
+ if (numSubTreesPerSubTree <= 0) {
+ return Lists.newArrayList();
+ } else if (isSqrt(numSubTreeChildren, numChildrenTotal)) {
+ // redistribute all grandchild sub-trees to pack every grandchild
beneath the final child sub-tree
+ final List<Integer> grandChildCounts = new
ArrayList<>(Collections.nCopies(numSubTreesPerSubTree - 1, 0));
+ grandChildCounts.add(numChildrenTotal);
+ return grandChildCounts;
+ } else {
+ final int totalGrandChildSubTrees = numSubTreesPerSubTree *
numSubTreeChildren;
+ final int numTreesWithSolelySubTreeBranches = totalGrandChildSubTrees /
numChildrenTotal;
+ final int numSubTreesRemaining = totalGrandChildSubTrees %
numChildrenTotal;
+ assert (numTreesWithSolelySubTreeBranches == 1 && numSubTreesRemaining
== 0) || numTreesWithSolelySubTreeBranches == 0
+ : "present limitation: at most one sub-tree may use further
branching: (found: numSubTreesPerSubTree: "
+ + numSubTreesPerSubTree + "; numChildrenTotal: " + numChildrenTotal
+ " / numSubTreeChildren: "
+ + numSubTreeChildren + ")";
+ final List<Integer> grandChildCounts = new
ArrayList<>(Collections.nCopies(numSubTreesPerSubTree -
(numTreesWithSolelySubTreeBranches + 1), 0));
+ grandChildCounts.addAll(Collections.nCopies(Math.min(1,
numSubTreesPerSubTree - numTreesWithSolelySubTreeBranches),
numSubTreesRemaining));
+
grandChildCounts.addAll(Collections.nCopies(Math.min(numTreesWithSolelySubTreeBranches,
numSubTreesPerSubTree), numChildrenTotal));
+ return grandChildCounts;
+ }
+ }
+
+ /** @return whether `maxSubTrees` == `Math.sqrt(maxBranches)` */
+ private static boolean isSqrt(int maxSubTrees, int maxBranches) {
+ return maxSubTrees > 0 && maxSubTrees * maxSubTrees == maxBranches;
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
new file mode 100644
index 000000000..3a6661d09
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.workflow;
+
+import java.util.Optional;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/**
+ * Process all `WORK_ITEM`s of {@link Workload}, from `startIndex` to the end
by creating child workflows, where this and
+ * descendants should have at most `maxBranchesPerTree`, with at most
`maxSubTreesPerTree` of those being child
+ * workflows. (Non-child-workflow (terminal) branches are the activity
executions.)
+ *
+ * The underlying motivation is to create logical workflows of unbounded size,
despite Temporal's event history limit
+ * of 50Ki events; see: https://docs.temporal.io/workflows#event-history
+ *
+ * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a
good rule-of-thumb; `maxSubTreesPerTree
+ * must not exceed that. This enables consolidation, wherein continued
expansion occurs only along the tree's right-most edges.
+ *
+ * @param <WORK_ITEM> the type of task for which to invoke an appropriate
activity
+ * @param maxSubTreesForCurrentTreeOverride when the current tree should use
different max sub-trees than descendants
+ */
+@WorkflowInterface
+public interface NestingExecWorkflow<WORK_ITEM> {
+ /** @return the number of workload elements processed cumulatively by this
Workflow and its children */
+ @WorkflowMethod
+ int performWorkload(
+ WorkflowAddr addr,
+ Workload<WORK_ITEM> workload,
+ int startIndex,
+ int maxBranchesPerTree,
+ int maxSubTreesPerTree,
+ Optional<Integer> maxSubTreesForCurrentTreeOverride
+ );
+}