[
https://issues.apache.org/jira/browse/GOBBLIN-1569?focusedWorklogId=674407&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-674407
]
ASF GitHub Bot logged work on GOBBLIN-1569:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 03/Nov/21 06:44
Start Date: 03/Nov/21 06:44
Worklog Time Spent: 10m
Work Description: phet commented on a change in pull request #3421:
URL: https://github.com/apache/gobblin/pull/3421#discussion_r741592138
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_serde.GsonJobSpecSerDe;
+import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MySQL-backed Job Catalog for persisting (`JobSpec`) job configuration
information. Fully support (mutation) listeners and metrics.
+ */
+public class MysqlJobCatalog extends JobCatalogBase implements
MutableJobCatalog {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MysqlJobCatalog.class);
+ public static final String DB_CONFIG_PREFIX = "mysqlJobCatalog";
Review comment:
it's named after its purpose/function, to read clearly when used; e.g.
from the unit test:
```
Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.METRICS_ENABLED_KEY, "true")
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
...
```
what do you think?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_serde.GsonJobSpecSerDe;
+import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MySQL-backed Job Catalog for persisting (`JobSpec`) job configuration
information. Fully support (mutation) listeners and metrics.
Review comment:
sure, will rephrase to:
```
MySQL-backed Job Catalog for persisting (`JobSpec`) job configuration
information, which fully supports (mutation) listeners and metrics.
```
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_serde.GsonJobSpecSerDe;
+import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MySQL-backed Job Catalog for persisting (`JobSpec`) job configuration
information. Fully support (mutation) listeners and metrics.
+ */
+public class MysqlJobCatalog extends JobCatalogBase implements
MutableJobCatalog {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MysqlJobCatalog.class);
+ public static final String DB_CONFIG_PREFIX = "mysqlJobCatalog";
+
+ protected final MutableJobCatalog.MutableStandardMetrics mutableMetrics;
+ protected final MysqlBaseSpecStore jobSpecStore;
+
+ /**
+ * Initialize, with DB config contextualized by `DB_CONFIG_PREFIX`.
+ */
+ public MysqlJobCatalog(Config sysConfig)
+ throws IOException {
+ this(sysConfig, Optional.<MetricContext>absent(),
GobblinMetrics.isEnabled(sysConfig));
+ }
+
+ public MysqlJobCatalog(GobblinInstanceEnvironment env) throws IOException {
+ super(env);
+ this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+ this.jobSpecStore = createJobSpecStore(env.getSysConfig().getConfig());
+ }
+
+ public MysqlJobCatalog(Config sysConfig, Optional<MetricContext>
parentMetricContext,
+ boolean instrumentationEnabled) throws IOException {
+ super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled,
Optional.of(sysConfig));
+ this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+ this.jobSpecStore = createJobSpecStore(sysConfig);
+ }
+
+ @Override
+ protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config>
sysConfig) {
+ log.info("create standard metrics {} for {}",
MutableJobCatalog.MutableStandardMetrics.class.getName(),
this.getClass().getName());
+ return new MutableJobCatalog.MutableStandardMetrics(this, sysConfig);
+ }
+
+ protected MysqlBaseSpecStore createJobSpecStore(Config sysConfig) {
+ try {
+ return new MysqlBaseSpecStore(sysConfig, new GsonJobSpecSerDe()) {
+ @Override
+ protected String getConfigPrefix() {
+ return MysqlJobCatalog.DB_CONFIG_PREFIX;
+ }
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("unable to create `JobSpec` store", e);
+ }
+ }
+
+ /** @return all {@link JobSpec}s */
+ @Override
+ public List<JobSpec> getJobs() {
+ try {
+ return (List) jobSpecStore.getSpecs();
+ } catch (IOException e) {
+ throw new RuntimeException("error getting (all) job specs", e);
+ }
+ }
+
+ /**
+ * Obtain an iterator to fetch all job specifications. Unlike {@link
#getJobs()}, this method avoids loading
+ * all job configs into memory in the very beginning.
+ * Interleaving notes: jobs added/modified/deleted between `Iterator`
creation and exhaustion MAY or MAY NOT be reflected.
+ *
+ * @return an iterator for (all) present {@link JobSpec}s
+ */
+ @Override
+ public Iterator<JobSpec> getJobSpecIterator() {
+ try {
+ return Iterators.<Optional<JobSpec>, JobSpec>transform(
+ Iterators.filter(
+ Iterators.transform(jobSpecStore.getSpecURIs(), uri -> {
+ try {
+ return Optional.of(MysqlJobCatalog.this.getJobSpec(uri));
+ } catch (JobSpecNotFoundException e) {
+ MysqlJobCatalog.this.log.info("unable to retrieve previously
identified JobSpec by URI '{}'", uri);
+ return Optional.absent();
+ }}),
+ Optional::isPresent),
+ Optional::get);
+ } catch (IOException e) {
+ throw new RuntimeException("error iterating (all) job specs", e);
+ }
+ }
+
+ /**
+ * Fetch single {@link JobSpec} by URI.
+ * @return the `JobSpec`
+ * @throws {@link JobSpecNotFoundException}
+ */
+ @Override
+ public JobSpec getJobSpec(URI uri)
+ throws JobSpecNotFoundException {
+ Preconditions.checkNotNull(uri);
+ try {
+ return (JobSpec) jobSpecStore.getSpec(uri);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("error accessing job spec
'%s'", uri), e);
+ } catch (SpecNotFoundException e) {
+ throw new JobSpecNotFoundException(uri);
+ }
+ }
+
+ /**
+ * Add or update (when an existing) {@link JobSpec}, triggering the
appropriate
+ * {@link org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+ *
+ * NOTE: `synchronized` (w/ `remove()`) for integrity of (existence)
check-then-update.
+ */
+ @Override
+ public synchronized void put(JobSpec jobSpec) {
+ Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
+ Preconditions.checkNotNull(jobSpec);
+ try {
+ long startTime = System.currentTimeMillis();
+ boolean isUpdate = jobSpecStore.exists(jobSpec.getUri());
+ if (isUpdate) {
+ try {
+ jobSpecStore.updateSpec(jobSpec);
+ this.mutableMetrics.updatePutJobTime(startTime);
+ this.listeners.onUpdateJob(jobSpec);
+ } catch (SpecNotFoundException e) { // should never happen (since
`synchronized`)
+ throw new RuntimeException(String.format("error finding spec to
update '%s'", jobSpec.getUri()), e);
+ }
+ } else {
+ jobSpecStore.addSpec(jobSpec);
+ this.mutableMetrics.updatePutJobTime(startTime);
+ this.listeners.onAddJob(jobSpec);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("error updating or adding
JobSpec '%s'", jobSpec.getUri()), e);
+ }
+ }
+
+ /**
+ * Delete (an existing) {@link JobSpec}, triggering the appropriate {@link
org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+ *
+ * NOTE: `synchronized` w/ `put()` to protect its check-then-update.
+ */
+ @Override
+ public synchronized void remove(URI jobURI) {
+ remove(jobURI, false);
+ }
+
+ /**
+ * NOTE: `synchronized` w/ `put()` to protect its check-then-update.
+ *
+ * @param alwaysTriggerListeners whether invariably to trigger {@link
org.apache.gobblin.runtime.api.JobCatalogListener#onCancelJob(URI)}
+ */
+ @Override
+ public synchronized void remove(URI jobURI, boolean alwaysTriggerListeners) {
+ Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
+ Preconditions.checkNotNull(jobURI);
+ try {
+ long startTime = System.currentTimeMillis();
+ JobSpec jobSpec = (JobSpec) jobSpecStore.getSpec(jobURI);
+ jobSpecStore.deleteSpec(jobURI);
+ this.mutableMetrics.updateRemoveJobTime(startTime);
+ this.listeners.onDeleteJob(jobURI, jobSpec.getVersion());
Review comment:
I lack the history on this one. I'm also not following what additional
spec store calls arise as a result.
(overall I'm not opposed to streamlining the functionality on offer...)
please either provide more background context, or, if that effort would be a
separate follow-on, we could save the discussion for later.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+
+
+/**
+ * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that
currently only {@link JobSpec}s are supported.
+ */
+public class GsonJobSpecSerDe implements SpecSerDe {
+ private GsonSerDe<JobSpec> gsonSerDe;
Review comment:
yes, true
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+
+
+/**
+ * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that
currently only {@link JobSpec}s are supported.
+ */
+public class GsonJobSpecSerDe implements SpecSerDe {
Review comment:
good suggestion... I guess I took the prior impl at face value, but I
was able to re-work w/ generics into an abstract base class doing the heavy
lifting.
since java lacks a type class resolution capability, we still need to create
a specific, named derivation to supply the serializer and the deserializer.
handily, that non-generic class name also imparts the ability to refer to the
type within HOCON or even simple properties-based config.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_serde.GsonJobSpecSerDe;
+import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MySQL-backed Job Catalog for persisting (`JobSpec`) job configuration
information. Fully support (mutation) listeners and metrics.
+ */
+public class MysqlJobCatalog extends JobCatalogBase implements
MutableJobCatalog {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MysqlJobCatalog.class);
+ public static final String DB_CONFIG_PREFIX = "mysqlJobCatalog";
+
+ protected final MutableJobCatalog.MutableStandardMetrics mutableMetrics;
+ protected final MysqlBaseSpecStore jobSpecStore;
+
+ /**
+ * Initialize, with DB config contextualized by `DB_CONFIG_PREFIX`.
+ */
+ public MysqlJobCatalog(Config sysConfig)
+ throws IOException {
+ this(sysConfig, Optional.<MetricContext>absent(),
GobblinMetrics.isEnabled(sysConfig));
+ }
+
+ public MysqlJobCatalog(GobblinInstanceEnvironment env) throws IOException {
+ super(env);
+ this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+ this.jobSpecStore = createJobSpecStore(env.getSysConfig().getConfig());
+ }
+
+ public MysqlJobCatalog(Config sysConfig, Optional<MetricContext>
parentMetricContext,
+ boolean instrumentationEnabled) throws IOException {
+ super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled,
Optional.of(sysConfig));
+ this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+ this.jobSpecStore = createJobSpecStore(sysConfig);
+ }
+
+ @Override
+ protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config>
sysConfig) {
+ log.info("create standard metrics {} for {}",
MutableJobCatalog.MutableStandardMetrics.class.getName(),
this.getClass().getName());
+ return new MutableJobCatalog.MutableStandardMetrics(this, sysConfig);
+ }
+
+ protected MysqlBaseSpecStore createJobSpecStore(Config sysConfig) {
+ try {
+ return new MysqlBaseSpecStore(sysConfig, new GsonJobSpecSerDe()) {
+ @Override
+ protected String getConfigPrefix() {
+ return MysqlJobCatalog.DB_CONFIG_PREFIX;
+ }
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("unable to create `JobSpec` store", e);
+ }
+ }
+
+ /** @return all {@link JobSpec}s */
+ @Override
+ public List<JobSpec> getJobs() {
+ try {
+ return (List) jobSpecStore.getSpecs();
+ } catch (IOException e) {
+ throw new RuntimeException("error getting (all) job specs", e);
+ }
+ }
+
+ /**
+ * Obtain an iterator to fetch all job specifications. Unlike {@link
#getJobs()}, this method avoids loading
+ * all job configs into memory in the very beginning.
+ * Interleaving notes: jobs added/modified/deleted between `Iterator`
creation and exhaustion MAY or MAY NOT be reflected.
+ *
+ * @return an iterator for (all) present {@link JobSpec}s
+ */
+ @Override
+ public Iterator<JobSpec> getJobSpecIterator() {
+ try {
+ return Iterators.<Optional<JobSpec>, JobSpec>transform(
+ Iterators.filter(
+ Iterators.transform(jobSpecStore.getSpecURIs(), uri -> {
+ try {
+ return Optional.of(MysqlJobCatalog.this.getJobSpec(uri));
+ } catch (JobSpecNotFoundException e) {
+ MysqlJobCatalog.this.log.info("unable to retrieve previously
identified JobSpec by URI '{}'", uri);
+ return Optional.absent();
+ }}),
+ Optional::isPresent),
+ Optional::get);
+ } catch (IOException e) {
+ throw new RuntimeException("error iterating (all) job specs", e);
+ }
+ }
+
+ /**
+ * Fetch single {@link JobSpec} by URI.
+ * @return the `JobSpec`
+ * @throws {@link JobSpecNotFoundException}
+ */
+ @Override
+ public JobSpec getJobSpec(URI uri)
+ throws JobSpecNotFoundException {
+ Preconditions.checkNotNull(uri);
+ try {
+ return (JobSpec) jobSpecStore.getSpec(uri);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("error accessing job spec
'%s'", uri), e);
+ } catch (SpecNotFoundException e) {
+ throw new JobSpecNotFoundException(uri);
+ }
+ }
+
+ /**
+ * Add or update (when an existing) {@link JobSpec}, triggering the
appropriate
+ * {@link org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+ *
+ * NOTE: `synchronized` (w/ `remove()`) for integrity of (existence)
check-then-update.
+ */
+ @Override
+ public synchronized void put(JobSpec jobSpec) {
+ Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
+ Preconditions.checkNotNull(jobSpec);
+ try {
+ long startTime = System.currentTimeMillis();
+ boolean isUpdate = jobSpecStore.exists(jobSpec.getUri());
+ if (isUpdate) {
+ try {
+ jobSpecStore.updateSpec(jobSpec);
+ this.mutableMetrics.updatePutJobTime(startTime);
Review comment:
I wanted to conclude the timing prior to listener dispatching,
especially since that's a `synchronized` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 674407)
Time Spent: 3h 10m (was: 3h)
> Provide JobCatalog backed by MySQL
> ----------------------------------
>
> Key: GOBBLIN-1569
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1569
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Present storage bindings for `JobCatalog` are in-memory or file system.
> While the latter is durable, it requires an NFS-like capability for
> cross-instance sharing.
> Support alternative multi-instance backing through MySQL, precluding the need
> for NFS.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)