This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6691a37 [GOBBLIN-1519] Simplify InstrumentedSpecStore implementation
(#3369)
6691a37 is described below
commit 6691a37d863d35cb5d217355ff8667f29cdb03e8
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Aug 19 17:08:00 2021 -0700
[GOBBLIN-1519] Simplify InstrumentedSpecStore implementation (#3369)
this is an internal, implementation-level refactoring that should not
modify externally-observable behavior. it was motivated by the repetitive, near
boiler plate for timing each operation in InstrumentedSpecStore, as noticed in
the recent PR - #3367. it:
* simplifies that class by DRYing it up through a companion helper class.
* preserves method signatures for source compatibility
* measures nanoseconds, rather than milliseconds, since that is what
metrics.Timer trades in internally
---
.../gobblin/runtime/api/InstrumentedSpecStore.java | 159 ++++++++++-----------
1 file changed, 73 insertions(+), 86 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 94585a0..901ef88 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -41,139 +41,126 @@ import org.apache.gobblin.util.ConfigUtils;
* Subclasses should implement addSpecImpl instead of addSpec and so on.
*/
public abstract class InstrumentedSpecStore implements SpecStore {
- private Optional<Timer> getTimer;
- private Optional<Timer> existsTimer;
- private Optional<Timer> deleteTimer;
- private Optional<Timer> addTimer;
- private Optional<Timer> updateTimer;
- private Optional<Timer> getAllTimer;
- private Optional<Timer> getURIsTimer;
+
+ /** Record timing for an operation, when an `Optional<Timer>.isPresent()`,
otherwise simply perform the operation w/o timing */
+ static class OptionallyTimingInvoker {
+
+ /** `j.u.function.Supplier` variant for an operation that may @throw
IOException: preserves method signature, including checked exceptions */
+ @FunctionalInterface
+ public interface SupplierMayThrowIO<T> {
+ public T get() throws IOException;
+ }
+
+ /** `j.u.function.Supplier` variant for an operation that may @throw
IOException or SpecNotFoundException: preserves method signature exceptions */
+ @FunctionalInterface
+ public interface SupplierMayThrowBoth<T> {
+ public T get() throws IOException, SpecNotFoundException;
+ }
+
+ private final Optional<Timer> timer;
+
+ public OptionallyTimingInvoker(Optional<Timer> timer) {
+ this.timer = timer != null ? timer : Optional.absent();
+ }
+
+ public <T> T invokeMayThrowIO(SupplierMayThrowIO<T> f) throws IOException {
+ try {
+ return invokeMayThrowBoth(() -> f.get());
+ } catch (SpecNotFoundException e) {
+ throw new RuntimeException("IMPOSSIBLE - prohibited by static
checking!", e);
+ }
+ }
+
+ public <T> T invokeMayThrowBoth(SupplierMayThrowBoth<T> f) throws
IOException, SpecNotFoundException {
+ if (timer.isPresent()) {
+ final long startTimeNanos = System.nanoTime(); // ns resolution, being
internal granularity of `metrics.Timer`
+ final T result = f.get();
+ timer.get().update(System.nanoTime() - startTimeNanos,
TimeUnit.NANOSECONDS);
+ return result;
+ } else { // skip timing, when no `Timer`
+ return f.get();
+ }
+ }
+ }
+
+ private OptionallyTimingInvoker getTimer;
+ private OptionallyTimingInvoker existsTimer;
+ private OptionallyTimingInvoker deleteTimer;
+ private OptionallyTimingInvoker addTimer;
+ private OptionallyTimingInvoker updateTimer;
+ private OptionallyTimingInvoker getAllTimer;
+ private OptionallyTimingInvoker getSizeTimer;
+ private OptionallyTimingInvoker getURIsTimer;
private MetricContext metricContext;
private final boolean instrumentationEnabled;
public InstrumentedSpecStore(Config config, SpecSerDe specSerDe) {
this.instrumentationEnabled = GobblinMetrics.isEnabled(new
State(ConfigUtils.configToProperties(config)));
this.metricContext = Instrumented.getMetricContext(new State(),
getClass());
- this.getTimer = createTimer("-GET");
- this.existsTimer = createTimer("-EXISTS");
- this.deleteTimer = createTimer("-DELETE");
- this.addTimer = createTimer("-ADD");
- this.updateTimer = createTimer("-UPDATE");
- this.getAllTimer = createTimer("-GETALL");
- this.getURIsTimer = createTimer("-GETURIS");
+ this.getTimer = createTimingInvoker("-GET");
+ this.existsTimer = createTimingInvoker("-EXISTS");
+ this.deleteTimer = createTimingInvoker("-DELETE");
+ this.addTimer = createTimingInvoker("-ADD");
+ this.updateTimer = createTimingInvoker("-UPDATE");
+ this.getAllTimer = createTimingInvoker("-GETALL");
+ this.getSizeTimer = createTimingInvoker("-GETCOUNT");
+ this.getURIsTimer = createTimingInvoker("-GETURIS");
+ }
+
+ private OptionallyTimingInvoker createTimingInvoker(String suffix) {
+ return new OptionallyTimingInvoker(createTimer(suffix));
}
private Optional<Timer> createTimer(String suffix) {
return instrumentationEnabled
- ?
Optional.of(this.metricContext.timer(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,getClass().getSimpleName(),
suffix)))
+ ?
Optional.of(this.metricContext.timer(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
getClass().getSimpleName(), suffix)))
: Optional.absent();
}
@Override
public boolean exists(URI specUri) throws IOException {
- if (!instrumentationEnabled) {
- return existsImpl(specUri);
- } else {
- long startTimeMillis = System.currentTimeMillis();
- boolean ret = existsImpl(specUri);
- Instrumented.updateTimer(this.existsTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return ret;
- }
+ return this.existsTimer.invokeMayThrowIO(() -> existsImpl(specUri));
}
@Override
public void addSpec(Spec spec) throws IOException {
- if (!instrumentationEnabled) {
- addSpecImpl(spec);
- } else {
- long startTimeMillis = System.currentTimeMillis();
- addSpecImpl(spec);
- Instrumented.updateTimer(this.addTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- }
+ this.addTimer.invokeMayThrowIO(() -> { addSpecImpl(spec); /* sadly, unable
to infer `SupplierMayThrowIO<Void>`, thus explicitly... */ return null; });
}
@Override
public boolean deleteSpec(URI specUri) throws IOException {
- if (!instrumentationEnabled) {
- return deleteSpecImpl(specUri);
- } else {
- long startTimeMillis = System.currentTimeMillis();
- boolean ret = deleteSpecImpl(specUri);
- Instrumented.updateTimer(this.deleteTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return ret;
- }
+ return this.deleteTimer.invokeMayThrowIO(() -> deleteSpecImpl(specUri));
}
@Override
public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
- if (!instrumentationEnabled) {
- return getSpecImpl(specUri);
- } else {
- long startTimeMillis = System.currentTimeMillis();
- Spec spec = getSpecImpl(specUri);
- Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return spec;
- }
+ return this.getTimer.invokeMayThrowBoth(() -> getSpecImpl(specUri));
}
@Override
public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) throws
IOException {
- if (!instrumentationEnabled) {
- return getSpecsImpl(specSearchObject);
- } else {
- long startTimeMillis = System.currentTimeMillis();
- Collection<Spec> specs = getSpecsImpl(specSearchObject);
- Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return specs;
- }
+ // NOTE: uses same `getTimer` as `getSpec`; TODO: explore separating,
since measuring different operation
+ return this.getTimer.invokeMayThrowIO(() ->
getSpecsImpl(specSearchObject));
}
@Override
public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
- if (!instrumentationEnabled) {
- return updateSpecImpl(spec);
- } else {
- long startTimeMillis = System.currentTimeMillis();
- Spec ret = updateSpecImpl(spec);
- Instrumented.updateTimer(this.updateTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return ret;
- }
+ return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec));
}
@Override
public Collection<Spec> getSpecs() throws IOException {
- if (!instrumentationEnabled) {
- return getSpecsImpl();
- } else {
- long startTimeMillis = System.currentTimeMillis();
- Collection<Spec> spec = getSpecsImpl();
- Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return spec;
- }
+ return this.getAllTimer.invokeMayThrowIO(() -> getSpecsImpl());
}
@Override
public Iterator<URI> getSpecURIs() throws IOException {
- if (!instrumentationEnabled) {
- return getSpecURIsImpl();
- } else {
- long startTimeMillis = System.currentTimeMillis();
- Iterator<URI> specURIs = getSpecURIsImpl();
- Instrumented.updateTimer(this.getURIsTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return specURIs;
- }
+ return this.getURIsTimer.invokeMayThrowIO(() -> getSpecURIsImpl());
}
@Override
public int getSize() throws IOException {
- if (!instrumentationEnabled) {
- return getSizeImpl();
- } else {
- long startTimeMillis = System.currentTimeMillis();
- int size = getSizeImpl();
- Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() -
startTimeMillis, TimeUnit.MILLISECONDS);
- return size;
- }
+ return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
}
public abstract void addSpecImpl(Spec spec) throws IOException;