[
https://issues.apache.org/jira/browse/GOBBLIN-1941?focusedWorklogId=887657&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887657
]
ASF GitHub Bot logged work on GOBBLIN-1941:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Oct/23 22:41
Start Date: 27/Oct/23 22:41
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3811:
URL: https://github.com/apache/gobblin/pull/3811#discussion_r1375097940
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Hierarchical address for nesting workflows (0-based). */
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WFAddr {
Review Comment:
Is it okay if we name if `WorkFlowAddr`? Shortening it seems unnecessary
here given we refer to work and workflows in other classes without shortening
them
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.Iterator;
+import java.util.List;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an
in-memory collection */
+@NoArgsConstructor
+@RequiredArgsConstructor
+public class SeqBackedWorkSpan<WORK_ITEM> implements
Workload.WorkSpan<WORK_ITEM> {
+
+ @NonNull
+ private List<WORK_ITEM> elems;
+ // CAUTION: despite the "warning: @NonNull is meaningless on a primitive
@lombok.RequiredArgsConstructor"...
+ // if removed, no two-arg ctor is generated, so syntax error on `new
CollectionBackedTaskSpan(elems, startIndex)`
+ @NonNull
+ private int startingIndex;
+ private transient Iterator<WORK_ITEM> statefulDelegatee = null;
+
+ @Override
+ public int getNumElems() {
+ return elems.size();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (statefulDelegatee == null) {
+ statefulDelegatee = elems.iterator();
+ }
+ return statefulDelegatee.hasNext();
+ }
+
+ @Override
+ public WORK_ITEM next() {
+ if (statefulDelegatee == null) {
+ throw new IllegalStateException("first call `hasNext()`!");
Review Comment:
Is it regular for collections to enforce that `next()` is only called after
`hasNext()`? When I was using this class prior I was tripped up around here as
my expectation was that next() will fail only if there are no elements left in
the list
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.workflow;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.utils.Lists;
+
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.util.nesting.work.WFAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only
define {@link #launchAsyncActivity} */
Review Comment:
Can we have a bit more documentation on what values are expected for
maxBranchesPerTree, maxSubTreesPerTree, etc for the expectation of a more
balanced tree?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.workflow;
+
+import java.util.Optional;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.util.nesting.work.WFAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/**
+ * Process all `WORK_ITEM`s of {@link Workload}, from `startIndex` to the end
by creating child workflows, where this and
+ * descendants should have at most `maxBranchesPerTree`, with at most
`maxSubTreesPerTree` of those being child
+ * workflows. (Non-child-workflow (terminal) branches are the activity
executions.)
+ *
+ * The underlying motivation is to create logical workflows of unbounded size,
despite Temporal's event history limit
+ * of 50Ki events; see: https://docs.temporal.io/workflows#event-history
+ *
+ * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a
good rule-of-thumb; `maxSubTreesPerTree
+ * must not exceed that. This enables consolidation, wherein continued
expansion occurs only along the tree's right-most edges.
+ *
+ * @param <WORK_ITEM> the type of task for which to invoke an appropriate
activity
+ * @param maxSubTreesForCurrentTreeOverride when the current tree should use
different max sub-trees than descendants
+ */
+@WorkflowInterface
+public interface NestingExecWorkflow<WORK_ITEM> {
+ @WorkflowMethod
+ int performWorkload(
Review Comment:
Can we add a javadoc for what the return value should represent?
Issue Time Tracking
-------------------
Worklog Id: (was: 887657)
Time Spent: 20m (was: 10m)
> Create abstractions for developing on Temporal
> ----------------------------------------------
>
> Key: GOBBLIN-1941
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1941
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Develop abstractions to assist in implementing Gobblin-on-Temporal, including
> to overcome temporal's limitation of 50Ki events per workflow history -
> https://docs.temporal.io/workflows#event-history
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)