phet commented on code in PR #3778:
URL: https://github.com/apache/gobblin/pull/3778#discussion_r1329618092


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java:
##########
@@ -79,7 +79,8 @@
  * The central cluster manager for Gobblin Clusters.
  *
  * <p>
- *   This class runs the {@link GobblinHelixJobScheduler} for scheduling and 
running Gobblin jobs.
+ *   This class runs the {@link GobblinHelixJobScheduler} for scheduling
+ *   and running Gobblin jobs.

Review Comment:
   if we're not changing anything here, leave this file off the commit.
   
   non-substantive, formatting-only changes are OK, but belong in a separate 
commit



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -234,5 +234,7 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY 
= "helix.job.scheduling.throttle.timeout.seconds";
   public static final long 
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = 
Duration.ofMinutes(40).getSeconds();;
-
+  public static final String TEMPORAL_WORKER_SIZE = "temporal.worker.size";

Review Comment:
   let's choose an unambiguous name.  is this num workers or size of each one?  
if the latter, what units are we discussing (e.g. mb of memory)?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflowImpl.java:
##########
@@ -0,0 +1,34 @@
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+import java.time.Duration;
+
+/** {@link com.linkedin.temporal.app.workflow.nesting.NestingExecWorkflow} for 
{@link IllustrationTask} */
+public class NestingExecWorkflowImpl
+        extends AbstractNestingExecWorkflowImpl<IllustrationTask, String> {

Review Comment:
   naming-wise, this is really just one among a family of `NestingExecWorkflow` 
impls, the one for `IllustrationTask`s.
   
   the class name should likely reflect that



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java:
##########
@@ -38,7 +40,7 @@
  *
  */
 public class EventSubmitter {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(EventSubmitter.class);

Review Comment:
   `@lombok.Slf4j` will do this for you... but why do we need this?  (I don't 
see `LOGGER` used anywhere)



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -234,5 +234,7 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY 
= "helix.job.scheduling.throttle.timeout.seconds";
   public static final long 
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = 
Duration.ofMinutes(40).getSeconds();;
-
+  public static final String TEMPORAL_WORKER_SIZE = "temporal.worker.size";
+  public static final String TEMPORAL_ENABLED = "temporal.enabled";

Review Comment:
   so many of the other keys use `GOBBLIN_CLUSTER_PREFIX`; shouldn't this one?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinJobLauncher.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import javax.annotation.Nullable;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.JobEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.rest.LauncherTypeEnum;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+/**
+ * An implementation of {@link JobLauncher} that launches a Gobblin job using 
the Temporal task framework.

Review Comment:
   it sounds here like temporal is the fundamental proposition, but I don't 
immediately see where it comes in within this class def.  is the class actually 
more general?  please update the javadoc to guide on where that stands



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java:
##########
@@ -28,7 +28,7 @@
 /**
  * Metrics that relates to jobs launched by {@link GobblinHelixJobLauncher}.
  */
-class GobblinHelixJobLauncherMetrics extends 
StandardMetricsBridge.StandardMetrics {
+public class GobblinHelixJobLauncherMetrics extends 
StandardMetricsBridge.StandardMetrics {

Review Comment:
   I understand the need to call from the `temporal` sub-package you've 
created... but that sidesteps the potential confusion of using "helix-named" 
classes, when we're not actually involved w/ helix at runtime.
   
   a good compromise for this PR, might be to add a TODO to all three--this, 
the job scheduler metrics, and the `GobblinHelixJobLauncherListener`--about 
renaming to be helix/temporal agnostic... (or if merited to instead create a 
separate temporal-specific variant)



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java:
##########
@@ -151,4 +151,5 @@ public class GobblinYarnConfigurationKeys {
   //Config to control Heartbeat interval for Yarn AMRM client.
   public static final String AMRM_HEARTBEAT_INTERVAL_SECS = 
GOBBLIN_YARN_PREFIX + "amRmHeartbeatIntervalSecs";
   public static final Integer DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS = 15;
+  public static final String TEMPORAL_WORKERPOOL_SIZE = 
"temporal.workerpool.size";

Review Comment:
   is the worker pool synonymous w/ the number of yarn containers?  are they 
per-container thread pools?  let's use a name to clearly convey exactly what's 
being configured



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/AbstractNestingExecWorkflowImpl.java:
##########


Review Comment:
   this and a bunch of other files representing cumulatively a specific worker 
w/ its particular workflows and activities (plus supporting abstractions, like 
`Workload` and `WFAddr`)--let's put them into their own package, separate from 
the worker-agnostic scaffolding to run arbitrary workers.
   
   our next follow-on PR will be to load the specific worker based on config 
(using reflection), so separating today's "example worker" from the reusable 
scaffolding, anticipates that future.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/AbstractTemporalWorker.java:
##########
@@ -0,0 +1,42 @@
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.Worker;
+import io.temporal.worker.WorkerOptions;
+import io.temporal.worker.WorkerFactory;
+public abstract class AbstractTemporalWorker {

Review Comment:
   despite my call to segregate worker-specific logic from overall scaffolding, 
this may be the one class that belongs in the package w/ the latter.
   
   to be most useful however, we probably want to define:
   ```
   interface TemporalWorker {
     void start();
     void shutdown();
   }
   ```
   and have this one implement that.  `TemporalWorker` would serve as the type 
we expect arbitrary, reflection-loaded workers to conform to.



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java:
##########
@@ -211,6 +211,7 @@ public void submitEvent(GobblinTrackingEvent 
nonReusableEvent) {
 
     EventNotification notification = new EventNotification(nonReusableEvent);
     sendNotification(notification);
+    LOG.info("EventBuilder {} is submitted.", nonReusableEvent);

Review Comment:
   just checking this didn't sneak in by accident... does this deserve to live 
on beyond whatever debugging session may have inspired it?
   
   e.g. more informative and more helpful for the future might be instead to 
catch and log whatever exception could cause early return by `sendNotification`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Shared.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+public interface Shared {
+
+    // Define the task queue name
+    final String GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";

Review Comment:
   rather than hard-coding, the temporal queue name should be provided by 
config.  thus no need for this class: swap it for a new key in 
`GobblinClusterConfiguration`



##########
gobblin-core/src/main/java/org/apache/gobblin/security/ssl/SSLContextFactory.java:
##########
@@ -119,7 +119,7 @@ public static SSLContext createInstance(Config srcConfig) {
         new File(trustStoreFilePath), trustStorePassword);
   }
 
-  private static InputStream toInputStream(File storeFile)
+  public static InputStream toInputStream(File storeFile)

Review Comment:
   this seems general enough to belong inside `gobblin-utility`.  define that 
and then transition callers of this to using that.  given it was `private`, 
there shouldn't be any files to update beyond this one.
   
   ...but that aside, I'm unclear on the rationale for the intermediate byte 
array.  why not instead merely `new FileInputStream`?  please explain



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Workload.java:
##########
@@ -0,0 +1,37 @@
+package org.apache.gobblin.cluster.temporal;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import java.util.Iterator;
+import java.util.Optional;
+
+
+/**
+ * An assemblage of "work", modeled as sequential "task" specifications.  
Given Temporal's required determinism, tasks
+ * and task spans should remain unchanged, with stable sequential ordering.  
This need not constrain `Workload`s to
+ * eager, advance elaboration: "streaming" definition is possible, so long as 
producing a deterministic result.
+ *
+ * A actual, real-world workload might correspond to datastore contents, such 
as records serialized into HDFS files
+ * or ordered DB query results.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "@class") // to handle impls
+

Review Comment:
   no need for extra line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinJobLauncher.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import javax.annotation.Nullable;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.JobEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.rest.LauncherTypeEnum;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+/**
+ * An implementation of {@link JobLauncher} that launches a Gobblin job using 
the Temporal task framework.
+ *
+ * <p>
+ *   Each {@link WorkUnit} of the job is persisted to the {@link FileSystem} 
of choice and the path to the file
+ *   storing the serialized {@link WorkUnit} is passed to the Temporal task 
running the {@link WorkUnit} as a
+ *   user-defined property {@link 
GobblinClusterConfigurationKeys#WORK_UNIT_FILE_PATH}. Upon startup, the gobblin
+ *   task reads the property for the file path and de-serializes the {@link 
WorkUnit} from the file.
+ * </p>
+ */
+@Alpha
+@Slf4j
+public class GobblinJobLauncher extends AbstractJobLauncher {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinJobLauncher.class);

Review Comment:
   why not rely solely on `@Slf4j`?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalJobLauncher.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinJobLauncher;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.gobblin.cluster.temporal.TemporalWorkflowClientFactory.createClientInstance;
+import static 
org.apache.gobblin.cluster.temporal.TemporalWorkflowClientFactory.createServiceInstance;
+
+/**
+ * An implementation of {@link JobLauncher} that launches a Gobblin job using 
the Temporal task framework.
+ *
+ * <p>
+ *   Each {@link WorkUnit} of the job is persisted to the {@link FileSystem} 
of choice and the path to the file
+ *   storing the serialized {@link WorkUnit} is passed to the Temporal task 
running the {@link WorkUnit} as a
+ *   user-defined property {@link 
GobblinClusterConfigurationKeys#WORK_UNIT_FILE_PATH}. Upon startup, the gobblin
+ *   task reads the property for the file path and de-serializes the {@link 
WorkUnit} from the file.
+ * </p>
+ *
+ * <p>
+ *   This class is instantiated by the {@link GobblinTemporalJobScheduler} on 
every job submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link 
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Alpha
+@Slf4j
+public class GobblinTemporalJobLauncher extends GobblinJobLauncher {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinTemporalJobLauncher.class);

Review Comment:
   again, isn't this redundant?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalJobScheduler.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gobblin.cluster.*;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * An extension to {@link JobScheduler} that schedules and runs
+ * Gobblin jobs on Temporal.
+ *
+ * <p> If the job should be launched from the scheduler node,
+ * {@link GobblinTemporalJobLauncher} is invoked.
+ * TODO(yiyang): this file should be cleaned up with HelixJobScheduler.
+ */
+@Alpha
+public class GobblinTemporalJobScheduler extends JobScheduler implements 
StandardMetricsBridge {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinTemporalJobScheduler.class);
+  private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
+
+  private final Properties commonJobProperties;
+  private final EventBus eventBus;
+  private final Path appWorkDir;
+  private final List<? extends Tag<?>> metadataTags;
+  private final ConcurrentHashMap<String, Boolean> jobRunningMap;
+  private final MetricContext metricContext;
+  final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
+  final GobblinHelixJobLauncherMetrics launcherMetrics;
+  final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
+  final HelixJobsMapping jobsMapping;

Review Comment:
   noting again, what I previously mentioned: there's a confusing amount of 
reference to helix, given the framework isn't actually involved in execution



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalJobScheduler.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gobblin.cluster.*;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * An extension to {@link JobScheduler} that schedules and runs
+ * Gobblin jobs on Temporal.
+ *
+ * <p> If the job should be launched from the scheduler node,
+ * {@link GobblinTemporalJobLauncher} is invoked.
+ * TODO(yiyang): this file should be cleaned up with HelixJobScheduler.
+ */
+@Alpha
+public class GobblinTemporalJobScheduler extends JobScheduler implements 
StandardMetricsBridge {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinTemporalJobScheduler.class);
+  private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
+
+  private final Properties commonJobProperties;
+  private final EventBus eventBus;
+  private final Path appWorkDir;
+  private final List<? extends Tag<?>> metadataTags;
+  private final ConcurrentHashMap<String, Boolean> jobRunningMap;
+  private final MetricContext metricContext;
+  final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
+  final GobblinHelixJobLauncherMetrics launcherMetrics;
+  final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
+  final HelixJobsMapping jobsMapping;
+  private boolean startServicesCompleted;
+
+  public GobblinTemporalJobScheduler(Config sysConfig,
+                                     EventBus eventBus,
+                                     Path appWorkDir, List<? extends Tag<?>> 
metadataTags,
+                                     SchedulerService schedulerService) throws 
Exception {

Review Comment:
   line-continuation indentation should be four spaces.  see the 
`GobblinHelixJobLauncher` ctor as an example



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAppMasterSecurityManager.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Throwables;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
+
+
+public class YarnTemporalAppMasterSecurityManager extends 
YarnContainerSecurityManager{

Review Comment:
   needs javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to