This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new da6b1dfd0 [GOBBLIN-1941] Develop Temporal abstractions, including 
`Workload` for workflows of unbounded size through sub-workflow nesting (#3811)
da6b1dfd0 is described below

commit da6b1dfd02400b6aafe5a650a6fb8d9388cdac0f
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Oct 30 16:22:58 2023 -0700

    [GOBBLIN-1941] Develop Temporal abstractions, including `Workload` for 
workflows of unbounded size through sub-workflow nesting (#3811)
    
    * Define `Workload` abstraction for Temporal workflows of unbounded size 
through sub-workflow nesting
    
    * Adjust Gobblin-Temporal configurability for consistency and abstraction
    
    * Define `WorkerConfig`, to pass the `TemporalWorker`'s configuration to 
the workflows and activities it hosts
    
    * Improve javadoc
    
    * Javadoc fixup
    
    * Minor changes
    
    * Update per review suggestions
    
    * Insert pause, to spread the load on the temporal server, before launch of 
each child workflow that may have direct leaves of its own
    
    * Appease findbugs by having `SeqSliceBackedWorkSpan::next` throw 
`NoSuchElementException`
    
    * Add comment
---
 .../temporal/GobblinTemporalConfigurationKeys.java |   9 +-
 .../temporal/cluster/AbstractTemporalWorker.java   |  26 +++-
 .../cluster/GobblinTemporalTaskRunner.java         |  12 +-
 .../gobblin/temporal/cluster/TemporalWorker.java   |  28 ++++
 .../gobblin/temporal/cluster/WorkerConfig.java     |  72 +++++++++
 .../joblauncher/GobblinTemporalJobScheduler.java   |   2 +-
 .../util/nesting/work/SeqBackedWorkSpan.java       |  70 +++++++++
 .../util/nesting/work/SeqSliceBackedWorkSpan.java  |  75 ++++++++++
 .../temporal/util/nesting/work/WorkflowAddr.java   |  65 ++++++++
 .../temporal/util/nesting/work/Workload.java       |  57 +++++++
 .../workflow/AbstractNestingExecWorkflowImpl.java  | 164 +++++++++++++++++++++
 .../util/nesting/workflow/NestingExecWorkflow.java |  55 +++++++
 12 files changed, 619 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 63e9adc38..f238ffc9b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -30,15 +30,18 @@ public interface GobblinTemporalConfigurationKeys {
 
   String PREFIX = "gobblin.temporal.";
 
-  String WORKER_CLASS = PREFIX + "worker";
+  String WORKER_CLASS = PREFIX + "worker.class";
   String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
   String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
   String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
 
   String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
   String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
-  String GOBBLIN_TEMPORAL_JOB_LAUNCHER = PREFIX + "job.launcher";
-  String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER = 
HelloWorldJobLauncher.class.getName();
+  String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher.";
+  String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class";
+  String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = 
HelloWorldJobLauncher.class.getName();
+
+  String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
 
   /**
    * Number of worker processes to spin up per task runner
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
index 856257f28..0ed6652eb 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
@@ -17,17 +17,21 @@
 
 package org.apache.gobblin.temporal.cluster;
 
+import java.util.Arrays;
+
 import com.typesafe.config.Config;
 
 import io.temporal.client.WorkflowClient;
 import io.temporal.worker.Worker;
 import io.temporal.worker.WorkerFactory;
+import io.temporal.worker.WorkerOptions;
 
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
 
-public abstract class AbstractTemporalWorker {
+/** Basic boilerplate for a {@link TemporalWorker} to register its activity 
and workflow capabilities and listen on a particular queue */
+public abstract class AbstractTemporalWorker implements TemporalWorker {
     private final WorkflowClient workflowClient;
     private final String queueName;
     private final WorkerFactory workerFactory;
@@ -42,10 +46,13 @@ public abstract class AbstractTemporalWorker {
 
         // Create a Worker factory that can be used to create Workers that 
poll specific Task Queues.
         workerFactory = WorkerFactory.newInstance(workflowClient);
+
+        stashWorkerConfig(cfg);
     }
 
+    @Override
     public void start() {
-        Worker worker = workerFactory.newWorker(queueName);
+        Worker worker = workerFactory.newWorker(queueName, 
createWorkerOptions());
         // This Worker hosts both Workflow and Activity implementations.
         // Workflows are stateful, so you need to supply a type to create 
instances.
         worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
@@ -55,16 +62,25 @@ public abstract class AbstractTemporalWorker {
         workerFactory.start();
     }
 
-    /**
-     * Shuts down the worker.
-     */
+    @Override
     public void shutdown() {
         workerFactory.shutdown();
     }
 
+    protected WorkerOptions createWorkerOptions() {
+        return null;
+    }
+
     /** @return workflow types for *implementation* classes (not interface) */
     protected abstract Class<?>[] getWorkflowImplClasses();
 
     /** @return activity instances; NOTE: activities must be stateless and 
thread-safe, so a shared instance is used. */
     protected abstract Object[] getActivityImplInstances();
+
+    private final void stashWorkerConfig(Config cfg) {
+        // stash in association with...
+        WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
+        Arrays.stream(getWorkflowImplClasses()).forEach(clazz -> 
WorkerConfig.withImpl(clazz, cfg)); // its workflow impls
+        Arrays.stream(getActivityImplInstances()).forEach(obj -> 
WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls
+    }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index 79cb0baf7..61545f6ed 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -124,7 +124,7 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
   protected final String temporalQueueName;
   private final boolean isMetricReportingFailureFatal;
   private final boolean isEventReportingFailureFatal;
-  private final List<AbstractTemporalWorker> workers;
+  private final List<TemporalWorker> workers;
 
   public GobblinTemporalTaskRunner(String applicationName,
       String applicationId,
@@ -234,7 +234,7 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
     }
   }
 
-  private AbstractTemporalWorker initiateWorker() throws Exception{
+  private TemporalWorker initiateWorker() throws Exception {
     logger.info("Starting Temporal Worker");
 
     String connectionUri = 
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
@@ -246,8 +246,8 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
 
     String workerClassName = ConfigUtils.getString(clusterConfig,
         GobblinTemporalConfigurationKeys.WORKER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
-    AbstractTemporalWorker worker = 
GobblinConstructorUtils.invokeLongestConstructor(
-        (Class<AbstractTemporalWorker>) Class.forName(workerClassName), 
clusterConfig, client);
+    TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
+        (Class<TemporalWorker>)Class.forName(workerClassName), clusterConfig, 
client);
     worker.start();
     logger.info("A new worker is started.");
     return worker;
@@ -286,9 +286,7 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
       this.containerMetrics.get().stopMetricsReporting();
     }
 
-    for (AbstractTemporalWorker worker : workers) {
-      worker.shutdown();
-    }
+    workers.forEach(TemporalWorker::shutdown);
 
     logger.info("All services are stopped.");
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java
new file mode 100644
index 000000000..5474f6421
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+/** Marker interface for a temporal.io "worker", with capability to `start()` 
and `shutdown()` */
+public interface TemporalWorker {
+
+  /** Starts the worker */
+  void start();
+
+  /** Shuts down the worker */
+  void shutdown();
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java
new file mode 100644
index 000000000..ea581df25
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * Static holder to stash the {@link Config} used to construct each kind of 
{@link org.apache.gobblin.temporal.cluster.TemporalWorker}
+ * (within the current JVM).  Lookup may be by either the {@link Class} of the 
worker or of any workflow or activity implementation supplied by
+ * that worker.  The objective is to facilitate sharing the worker's Config 
with workflow and activity implementations (running within that worker).
+ *
+ * ATTENTION: for sanity, construct multiple instances of the same worker 
always with the same {@link Config}.  When this is violated, the `Config`
+ * given to the most-recently constructed worker "wins".
+ *
+ * NOTE: the preservation and sharing of {@link Config} is predicated entirely 
on its immutability.  Thank you TypeSafe!
+ * Storage indexing uses FQ class name, not the {@link Class}, to be 
independent of classloader.
+ */
+@Slf4j
+public class WorkerConfig {
+  private static final ConcurrentHashMap<String, Config> configByFQClassName = 
new ConcurrentHashMap<>();
+
+  private WorkerConfig() {}
+
+  /** @return whether initialized now (vs. being previously known) */
+  public static boolean forWorker(Class<? extends TemporalWorker> workerClass, 
Config config) {
+    return storeAs(workerClass.getName(), config);
+  }
+
+  /** @return whether initialized now (vs. being previously known) */
+  public static boolean withImpl(Class<?> workflowOrActivityImplClass, Config 
config) {
+    return storeAs(workflowOrActivityImplClass.getName(), config);
+  }
+
+  public static Optional<Config> ofWorker(Class<? extends TemporalWorker> 
workerClass) {
+    return Optional.ofNullable(configByFQClassName.get(workerClass.getName()));
+  }
+
+  public static Optional<Config> ofImpl(Class<?> workflowOrActivityImplClass) {
+    return 
Optional.ofNullable(configByFQClassName.get(workflowOrActivityImplClass.getName()));
+  }
+
+  public static Optional<Config> of(Object workflowOrActivityImpl) {
+    return ofImpl(workflowOrActivityImpl.getClass());
+  }
+
+  private static boolean storeAs(String className, Config config) {
+    Config prior = configByFQClassName.put(className, config);
+    log.info("storing config of {} values as '{}'{}", 
config.entrySet().size(), className, prior == null ? " (new)" : "");
+    return prior == null;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index 1b1ee14a4..d93e2a670 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -180,7 +180,7 @@ public class GobblinTemporalJobScheduler extends 
JobScheduler implements Standar
 
     Class<? extends GobblinTemporalJobLauncher> jobLauncherClass =
         (Class<? extends GobblinTemporalJobLauncher>) 
Class.forName(combinedProps.getProperty(
-        GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER, 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER));
+        GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS));
     return GobblinConstructorUtils.invokeLongestConstructor(jobLauncherClass, 
combinedProps,
             this.appWorkDir,
             this.metadataTags,
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java
new file mode 100644
index 000000000..51f67b859
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.Iterator;
+import java.util.List;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an 
in-memory collection */
+@NoArgsConstructor
+@RequiredArgsConstructor
+public class SeqBackedWorkSpan<WORK_ITEM> implements 
Workload.WorkSpan<WORK_ITEM> {
+
+  @NonNull
+  private List<WORK_ITEM> elems;
+  // CAUTION: despite the "warning: @NonNull is meaningless on a primitive 
@lombok.RequiredArgsConstructor"...
+  // if removed, no two-arg ctor is generated, so syntax error on `new 
CollectionBackedTaskSpan(elems, startIndex)`
+  @NonNull
+  private int startingIndex;
+  private transient Iterator<WORK_ITEM> statefulDelegatee = null;
+
+  @Override
+  public int getNumElems() {
+    return elems.size();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (statefulDelegatee == null) {
+      statefulDelegatee = elems.iterator();
+    }
+    return statefulDelegatee.hasNext();
+  }
+
+  @Override
+  public WORK_ITEM next() {
+    if (statefulDelegatee == null) {
+      throw new IllegalStateException("first call `hasNext()`!");
+    }
+    return statefulDelegatee.next();
+  }
+
+  @Override
+  public String toString() {
+    return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems() 
+ "})";
+  }
+
+  protected String getClassNickname() {
+    // return getClass().getSimpleName();
+    return "WorkSpan";
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java
new file mode 100644
index 000000000..8b649ac51
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.NoSuchElementException;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+
+/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an 
in-memory collection, *SHARED* w/ other work spans */
+@NoArgsConstructor
+@RequiredArgsConstructor
+public class SeqSliceBackedWorkSpan<WORK_ITEM> implements 
Workload.WorkSpan<WORK_ITEM> {
+  private static final int NOT_SET_SENTINEL = -1;
+
+  @NonNull private WORK_ITEM[] sharedElems;
+  // CAUTION: despite the "warning: @NonNull is meaningless on a primitive 
@lombok.RequiredArgsConstructor"...
+  // if removed, no two-arg ctor is generated, so syntax error on `new 
CollectionSliceBackedTaskSpan(elems, startIndex)`
+  @NonNull private int startingIndex;
+  @NonNull private int numElements;
+  private transient volatile int nextElemIndex = NOT_SET_SENTINEL;
+
+  @Override
+  public int getNumElems() {
+    return getEndingIndex() - startingIndex;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (nextElemIndex == NOT_SET_SENTINEL) {
+      nextElemIndex = startingIndex; // NOTE: `startingIndex` should be 
effectively `final` (post-deser) and always >= 0
+    }
+    return nextElemIndex < this.getEndingIndex();
+  }
+
+  @Override
+  public WORK_ITEM next() {
+    if (nextElemIndex >= startingIndex + numElements) {
+      throw new NoSuchElementException("index " + nextElemIndex + " >= " + 
startingIndex + " + " + numElements);
+    }
+    return sharedElems[nextElemIndex++];
+  }
+
+  @Override
+  public String toString() {
+    return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems() 
+ "})";
+  }
+
+  protected String getClassNickname() {
+    // return getClass().getSimpleName();
+    return "WorkSpan";
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  protected final int getEndingIndex() {
+    return Math.min(startingIndex + numElements, sharedElems.length);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
new file mode 100644
index 000000000..0329d90d9
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Hierarchical address for nesting workflows (0-based). */
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkflowAddr {
+  public static final String SEP = ".";
+
+  /** initial, top-level address */
+  public static final WorkflowAddr ROOT = new WorkflowAddr(0);
+
+  @Getter
+  @NonNull // IMPORTANT: for jackson (de)serialization (which won't permit 
`final`)
+  private List<Integer> segments;
+
+  public WorkflowAddr(final int firstLevelOnly) {
+    this(Lists.newArrayList(firstLevelOnly));
+  }
+
+  /** @return 0-based depth */
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public int getDepth() {
+    return segments.size() - 1;
+  }
+
+  /** Create a child of the current `WFAddr` */
+  public WorkflowAddr createChild(int childLevel) {
+    final List<Integer> copy = new ArrayList<>(segments);
+    copy.add(childLevel);
+    return new WorkflowAddr(copy);
+  }
+
+  @Override
+  public String toString() {
+    return Joiner.on(SEP).join(segments);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
new file mode 100644
index 000000000..239825f7c
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import java.util.Iterator;
+import java.util.Optional;
+
+
+/**
+ * {@link Workload} models a logical collection of homogenous inputs over 
which a "foreach" operation can asynchronously apply
+ * an arbitrary procedure to each element.  This encapsulates "processing" the 
entire collection of sequential
+ * "work item" specifications by the uniform application of the chosen 
procedure(s).
+ *
+ * Given Temporal's required determinism, the work items and work spans should 
remain unchanged, with stable sequential
+ * ordering.  This need not constrain `Workload`s to eager, advance 
elaboration: "streaming" definition is possible,
+ * so long as producing a deterministic result.
+ *
+ * A actual, real-world workload might correspond to datastore contents, such 
as records serialized into HDFS files
+ * or ordered DB query results.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "@class") // to handle impls
+public interface Workload<WORK_ITEM> {
+
+  /**
+   * @return a sequential sub-sequence, from `startIndex` (0-based), unless it 
falls beyond the underlying sequence
+   * NOTE: this is a blocking call that forces elaboration: 
`WorkSpan.getNumElems() < numElements` signifies end of seq
+   */
+  Optional<WorkSpan<WORK_ITEM>> getSpan(int startIndex, int numElements);
+
+  /** Non-blocking, best-effort advice: to support non-strict elaboration, 
does NOT guarantee `index` will not exceed */
+  boolean isIndexKnownToExceed(int index);
+
+  default boolean isDefiniteSize() {
+    return false;
+  }
+
+  /** Logical sub-sequence 'slice' of contiguous work items */
+  public interface WorkSpan<T> extends Iterator<T> {
+    int getNumElems();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
new file mode 100644
index 000000000..0dcf19a77
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.workflow;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.utils.Lists;
+
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only 
define {@link #launchAsyncActivity} */
+@Slf4j
+public abstract class AbstractNestingExecWorkflowImpl<WORK_ITEM, 
ACTIVITY_RESULT> implements NestingExecWorkflow<WORK_ITEM> {
+  public static final int 
NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT = 10;
+  public static final int 
MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100;
+
+  @Override
+  public int performWorkload(
+      final WorkflowAddr addr,
+      final Workload<WORK_ITEM> workload,
+      final int startIndex,
+      final int maxBranchesPerTree,
+      final int maxSubTreesPerTree,
+      final Optional<Integer> maxSubTreesForCurrentTreeOverride
+  ) {
+    final int maxSubTreesForCurrent = 
maxSubTreesForCurrentTreeOverride.orElse(maxSubTreesPerTree);
+    final int maxLeaves = maxBranchesPerTree - maxSubTreesForCurrent;
+    final Optional<Workload.WorkSpan<WORK_ITEM>> optSpan = 
workload.getSpan(startIndex, maxLeaves);
+    log.info("[" + addr + "] " + workload + " w/ start '" + startIndex + "'"
+        + "; tree (" + maxBranchesPerTree + "/" + maxSubTreesPerTree + "): " + 
optSpan);
+    if (!optSpan.isPresent()) {
+      return 0;
+    } else {
+      final Workload.WorkSpan<WORK_ITEM> workSpan = optSpan.get();
+      final Iterable<WORK_ITEM> iterable = () -> workSpan;
+      final List<Promise<ACTIVITY_RESULT>> childActivities = 
StreamSupport.stream(iterable.spliterator(), false)
+          .map(t -> launchAsyncActivity(t))
+          .collect(Collectors.toList());
+      final List<Promise<Integer>> childSubTrees = new ArrayList<>();
+      if (workSpan.getNumElems() == maxLeaves) { // received as many as 
requested (did not stop short)
+        int subTreeId = 0;
+        for (int subTreeChildMaxSubTreesPerTree
+            : consolidateSubTreeGrandChildren(maxSubTreesForCurrent, 
maxBranchesPerTree, maxSubTreesPerTree)) {
+          // CAUTION: calc these *before* incrementing `subTreeId`!
+          final int childStartIndex = startIndex + maxLeaves + 
(maxBranchesPerTree * subTreeId);
+          final int nextChildId = maxLeaves + subTreeId;
+          final WorkflowAddr childAddr = addr.createChild(nextChildId);
+          final NestingExecWorkflow<WORK_ITEM> child = 
createChildWorkflow(childAddr);
+          if (!workload.isIndexKnownToExceed(childStartIndex)) { // 
best-effort short-circuiting
+            // IMPORTANT: insert pause before launch of each child workflow 
that may have direct leaves of its own.  periodic pauses spread the load on the
+            // temporal server, to avoid a sustained burst from submitting 
potentially very many async activities over the full hierarchical elaboration
+            final int numDirectLeavesChildMayHave = maxBranchesPerTree - 
subTreeChildMaxSubTreesPerTree;
+            if (numDirectLeavesChildMayHave > 0) {
+              
Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave));
+            }
+            childSubTrees.add(
+                Async.function(child::performWorkload, childAddr, workload, 
childStartIndex, maxBranchesPerTree,
+                    maxSubTreesPerTree, 
Optional.of(subTreeChildMaxSubTreesPerTree)));
+            ++subTreeId;
+          }
+        }
+      }
+      final Promise<Void> allActivityChildren = Promise.allOf(childActivities);
+      allActivityChildren.get(); // ensure all complete prior to counting them 
in `overallActivitiesRollupCount`
+      // TODO: determine whether any benefit to unordered `::get` blocking for 
any next ready (perhaps no difference...)
+      final int descendantActivitiesRollupCount = 
childSubTrees.stream().map(Promise::get).reduce(0, (x, y) -> x + y);
+      // TODO: consider a generalized reduce op for things other than counting!
+      final int overallActivitiesRollupCount = workSpan.getNumElems() + 
descendantActivitiesRollupCount;
+      log.info("[" + addr + "] activites finished coordinating: " + 
overallActivitiesRollupCount);
+      return overallActivitiesRollupCount;
+    }
+  }
+
+  /** Factory for invoking the specific activity by providing it args via 
{@link Async::function} */
+  protected abstract Promise<ACTIVITY_RESULT> launchAsyncActivity(WORK_ITEM 
task);
+
+  protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final 
WorkflowAddr childAddr) {
+    // preserve the current workflow ID of this parent, but add the 
(hierarchical) address extension specific to each child
+    String thisWorkflowId = Workflow.getInfo().getWorkflowId();
+    String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + 
childAddr;
+    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+        .setWorkflowId(childWorkflowId)
+        .build();
+    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+  }
+
+  /** @return how long to pause prior to creating a child workflow, based on 
`numDirectLeavesChildMayHave` */
+  protected Duration calcPauseDurationBeforeCreatingSubTree(int 
numDirectLeavesChildMayHave) {
+    // (only pause when an appreciable number of leaves)
+    // TODO: use a configuration value, for simpler adjustment, rather than 
hard-code
+    return numDirectLeavesChildMayHave > 
MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
+        ? 
Duration.ofSeconds(NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT)
+        : Duration.ZERO;
+  }
+
+  /**
+   * "right-tilt" sub-tree's grandchildren, so final child gets all 
grandchildren (vs. constant grandchildren/child)
+   *   i.e. NOT!:
+   *     List<Integer> naiveUniformity = 
Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren);
+   * @return each sub-tree's desired size, in ascending sub-tree order
+   */
+  protected static List<Integer> consolidateSubTreeGrandChildren(
+      final int numSubTreesPerSubTree,
+      final int numChildrenTotal,
+      final int numSubTreeChildren
+  ) {
+    if (numSubTreesPerSubTree <= 0) {
+      return Lists.newArrayList();
+    } else if (isSqrt(numSubTreeChildren, numChildrenTotal)) {
+      // redistribute all grandchild sub-trees to pack every grandchild 
beneath the final child sub-tree
+      final List<Integer> grandChildCounts = new 
ArrayList<>(Collections.nCopies(numSubTreesPerSubTree - 1, 0));
+      grandChildCounts.add(numChildrenTotal);
+      return grandChildCounts;
+    } else {
+      final int totalGrandChildSubTrees = numSubTreesPerSubTree * 
numSubTreeChildren;
+      final int numTreesWithSolelySubTreeBranches = totalGrandChildSubTrees / 
numChildrenTotal;
+      final int numSubTreesRemaining = totalGrandChildSubTrees % 
numChildrenTotal;
+      assert (numTreesWithSolelySubTreeBranches == 1 && numSubTreesRemaining 
== 0) || numTreesWithSolelySubTreeBranches == 0
+          : "present limitation: at most one sub-tree may use further 
branching: (found: numSubTreesPerSubTree: "
+          + numSubTreesPerSubTree + "; numChildrenTotal: " + numChildrenTotal 
+ " / numSubTreeChildren: "
+          + numSubTreeChildren + ")";
+      final List<Integer> grandChildCounts = new 
ArrayList<>(Collections.nCopies(numSubTreesPerSubTree - 
(numTreesWithSolelySubTreeBranches + 1), 0));
+      grandChildCounts.addAll(Collections.nCopies(Math.min(1, 
numSubTreesPerSubTree - numTreesWithSolelySubTreeBranches), 
numSubTreesRemaining));
+      
grandChildCounts.addAll(Collections.nCopies(Math.min(numTreesWithSolelySubTreeBranches,
 numSubTreesPerSubTree), numChildrenTotal));
+      return grandChildCounts;
+    }
+  }
+
+  /** @return whether `maxSubTrees` == `Math.sqrt(maxBranches)` */
+  private static boolean isSqrt(int maxSubTrees, int maxBranches) {
+    return maxSubTrees > 0 && maxSubTrees * maxSubTrees == maxBranches;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
new file mode 100644
index 000000000..3a6661d09
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.workflow;
+
+import java.util.Optional;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/**
+ * Process all `WORK_ITEM`s of {@link Workload}, from `startIndex` to the end 
by creating child workflows, where this and
+ * descendants should have at most `maxBranchesPerTree`, with at most 
`maxSubTreesPerTree` of those being child
+ * workflows.  (Non-child-workflow (terminal) branches are the activity 
executions.)
+ *
+ * The underlying motivation is to create logical workflows of unbounded size, 
despite Temporal's event history limit
+ * of 50Ki events; see: https://docs.temporal.io/workflows#event-history
+ *
+ * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a 
good rule-of-thumb; `maxSubTreesPerTree
+ * must not exceed that.  This enables consolidation, wherein continued 
expansion occurs only along the tree's right-most edges.
+ *
+ * @param <WORK_ITEM> the type of task for which to invoke an appropriate 
activity
+ * @param maxSubTreesForCurrentTreeOverride when the current tree should use 
different max sub-trees than descendants
+ */
+@WorkflowInterface
+public interface NestingExecWorkflow<WORK_ITEM> {
+  /** @return the number of workload elements processed cumulatively by this 
Workflow and its children */
+  @WorkflowMethod
+  int performWorkload(
+      WorkflowAddr addr,
+      Workload<WORK_ITEM> workload,
+      int startIndex,
+      int maxBranchesPerTree,
+      int maxSubTreesPerTree,
+      Optional<Integer> maxSubTreesForCurrentTreeOverride
+  );
+}


Reply via email to