This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 96092f75d [GOBBLIN-1944] Add gobblin-temporal load generator for a 
single subsuming super-workflow with a configurable number of activities nested 
beneath (#3815)
96092f75d is described below

commit 96092f75d34d5839c9b72fc75fdea87c84e93b62
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Oct 31 19:01:24 2023 -0700

    [GOBBLIN-1944] Add gobblin-temporal load generator for a single subsuming 
super-workflow with a configurable number of activities nested beneath (#3815)
    
    * Add gobblin-temporal load generator for a single subsuming super-workflow 
with a configurable number of activities nested beneath
    
    * Update per findbugs advice
    
    * Improve processing of int props
---
 .../loadgen/activity/IllustrationItemActivity.java | 38 ++++++++++
 .../impl/IllustrationItemActivityImpl.java         | 33 +++++++++
 .../launcher/GenArbitraryLoadJobLauncher.java      | 85 ++++++++++++++++++++++
 .../temporal/loadgen/work/IllustrationItem.java    | 33 +++++++++
 .../loadgen/work/SimpleGeneratedWorkload.java      | 64 ++++++++++++++++
 .../loadgen/worker/ArbitraryLoadWorker.java        | 42 +++++++++++
 ...ExecOfIllustrationItemActivityWorkflowImpl.java | 55 ++++++++++++++
 .../org/apache/gobblin/util/PropertiesUtils.java   |  7 ++
 8 files changed, 357 insertions(+)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/IllustrationItemActivity.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/IllustrationItemActivity.java
new file mode 100644
index 000000000..a73c845d8
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/IllustrationItemActivity.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
+
+
+/**
+ * Activity for processing {@link IllustrationItem}s
+ *
+ * CAUTION/FINDING: an `@ActivityInterface` must not be parameterized (e.g. 
here, by WORK_ITEM), as doing so results in:
+ *   io.temporal.failure.ApplicationFailure: message='class 
java.util.LinkedHashMap cannot be cast to class
+ *       org.apache.gobblin.temporal.loadgen.work.IllustrationItem', 
type='java.lang.ClassCastException'
+ */
+@ActivityInterface
+public interface IllustrationItemActivity {
+  @ActivityMethod
+  String handleItem(IllustrationItem item);
+}
+
+
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/impl/IllustrationItemActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/impl/IllustrationItemActivityImpl.java
new file mode 100644
index 000000000..70e6b77a7
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/impl/IllustrationItemActivityImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.activity.impl;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity;
+import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
+
+
+@Slf4j
+public class IllustrationItemActivityImpl implements IllustrationItemActivity {
+  @Override
+  public String handleItem(final IllustrationItem item) {
+    log.info("Now illustrating - '" + item.getName() + "'");
+    return item.getName();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
new file mode 100644
index 000000000..62a7e4cfe
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.launcher;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import io.temporal.client.WorkflowOptions;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
+import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+import org.apache.gobblin.util.PropertiesUtils;
+
+import static 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow 
that generates arbitrary load of many
+ * activities nested beneath a single subsuming super-workflow.  see: {@link 
NestingExecWorkflow}
+ *
+ * <p>
+ *   This class is instantiated by the {@link 
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job 
submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link 
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Alpha
+@Slf4j
+public class GenArbitraryLoadJobLauncher extends GobblinTemporalJobLauncher {
+  public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NUM_ACTIVITIES 
= GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "num.activities";
+  public static final String 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_BRANCHES_PER_TREE = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "max.branches.per.tree";
+  public static final String 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "max.sub.trees.per.tree";
+
+  public GenArbitraryLoadJobLauncher(
+      Properties jobProps,
+      Path appWorkDir,
+      List<? extends Tag<?>> metadataTags,
+      ConcurrentHashMap<String, Boolean> runningMap
+  ) throws Exception {
+    super(jobProps, appWorkDir, metadataTags, runningMap);
+  }
+
+  @Override
+  public void submitJob(List<WorkUnit> workunits) {
+    int numActivities = PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NUM_ACTIVITIES);
+    int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_BRANCHES_PER_TREE);
+    int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE);
+
+    Workload<IllustrationItem> workload = 
SimpleGeneratedWorkload.createAs(numActivities);
+    WorkflowOptions options = 
WorkflowOptions.newBuilder().setTaskQueue(this.queueName).build();
+
+    // WARNING: although type param must agree w/ that of `workload`, it's 
entirely unverified by type checker!
+    // ...and more to the point, mismatch would occur at runtime 
(`performWorkload` on the workflow type given to the stub)!
+    NestingExecWorkflow<IllustrationItem> workflow = 
this.client.newWorkflowStub(NestingExecWorkflow.class, options);
+
+    workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
maxBranchesPerTree, maxSubTreesPerTree, Optional.empty());
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/IllustrationItem.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/IllustrationItem.java
new file mode 100644
index 000000000..aec6a6ffb
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/IllustrationItem.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.work;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Generally, this would specify what "work" needs performing plus how to 
perform, but for now merely a unique name (to log) */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class IllustrationItem {
+  @NonNull
+  private String name;
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/SimpleGeneratedWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/SimpleGeneratedWorkload.java
new file mode 100644
index 000000000..78bbecb20
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/SimpleGeneratedWorkload.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.work;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.AccessLevel;
+import org.apache.gobblin.temporal.util.nesting.work.SeqBackedWorkSpan;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/** Example, illustration workload that synthesizes its work items; genuine 
{@link Workload}s generally arise from query/calc */
[email protected](access = AccessLevel.PRIVATE)
[email protected] // IMPORTANT: for jackson (de)serialization
[email protected]
+public class SimpleGeneratedWorkload implements Workload<IllustrationItem> {
+  private int numItems;
+
+  /** Factory method */
+  public static SimpleGeneratedWorkload createAs(final int numItems) {
+    return new SimpleGeneratedWorkload(numItems);
+  }
+
+  @Override
+  public Optional<Workload.WorkSpan<IllustrationItem>> getSpan(final int 
startIndex, final int numElements) {
+    if (startIndex >= numItems || startIndex < 0) {
+      return Optional.empty();
+    } else {
+      List<IllustrationItem> elems = IntStream.range(startIndex, 
Math.min(startIndex + numElements, numItems))
+          .mapToObj(n -> new IllustrationItem("item-" + n + "-of-" + numItems))
+          .collect(Collectors.toList());
+      return Optional.of(new SeqBackedWorkSpan<>(elems, startIndex));
+    }
+  }
+
+  @Override
+  public boolean isIndexKnownToExceed(final int index) {
+    return isDefiniteSize() && index >= numItems;
+  }
+
+  @Override
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public boolean isDefiniteSize() {
+    return true;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/worker/ArbitraryLoadWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/worker/ArbitraryLoadWorker.java
new file mode 100644
index 000000000..296356190
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/worker/ArbitraryLoadWorker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.worker;
+
+import com.typesafe.config.Config;
+import io.temporal.client.WorkflowClient;
+import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import 
org.apache.gobblin.temporal.loadgen.activity.impl.IllustrationItemActivityImpl;
+import 
org.apache.gobblin.temporal.loadgen.workflow.impl.NestingExecOfIllustrationItemActivityWorkflowImpl;
+
+
+/** Worker for {@link NestingExecOfIllustrationItemActivityWorkflowImpl} and 
said activity impl */
+public class ArbitraryLoadWorker extends AbstractTemporalWorker {
+  public ArbitraryLoadWorker(Config config, WorkflowClient workflowClient) {
+    super(config, workflowClient);
+  }
+
+  @Override
+  protected Class<?>[] getWorkflowImplClasses() {
+    return new Class[] { 
NestingExecOfIllustrationItemActivityWorkflowImpl.class };
+  }
+
+  @Override
+  protected Object[] getActivityImplInstances() {
+    return new Object[] { new IllustrationItemActivityImpl() };
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
new file mode 100644
index 000000000..4346eecfd
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.workflow.impl;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+import java.time.Duration;
+import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity;
+import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
+import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
+
+
+/** {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for 
{@link IllustrationItem} */
+public class NestingExecOfIllustrationItemActivityWorkflowImpl
+    extends AbstractNestingExecWorkflowImpl<IllustrationItem, String> {
+
+  // RetryOptions specify how to automatically handle retries when Activities 
fail.
+  private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(1))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(3)
+      .build();
+
+  private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(Duration.ofSeconds(10))
+      .setRetryOptions(ACTIVITY_RETRY_OPTS)
+      .build();
+
+  private final IllustrationItemActivity activityStub =
+      Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS);
+
+  @Override
+  protected Promise<String> launchAsyncActivity(final IllustrationItem item) {
+    return Async.function(activityStub::handleItem, item);
+  }
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index bb53404d8..d6361b927 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -70,6 +70,13 @@ public class PropertiesUtils {
     return Integer.parseInt(properties.getProperty(key, 
Integer.toString(defaultValue)));
   }
 
+  /** @throws {@link NullPointerException} when `key` not in `properties` */
+  public static int getRequiredPropAsInt(Properties properties, String key) {
+    String value = properties.getProperty(key);
+    Preconditions.checkNotNull(value, "'" + key + "' must be set (to an 
integer)");
+    return Integer.parseInt(value);
+  }
+
   public static long getPropAsLong(Properties properties, String key, long 
defaultValue) {
     return Long.parseLong(properties.getProperty(key, 
Long.toString(defaultValue)));
   }

Reply via email to