This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6691a37  [GOBBLIN-1519] Simplify InstrumentedSpecStore implementation 
(#3369)
6691a37 is described below

commit 6691a37d863d35cb5d217355ff8667f29cdb03e8
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Aug 19 17:08:00 2021 -0700

    [GOBBLIN-1519] Simplify InstrumentedSpecStore implementation (#3369)
    
    this is an internal, implementation-level refactoring that should not 
modify externally-observable behavior. it was motivated by the repetitive, near 
boiler plate for timing each operation in InstrumentedSpecStore, as noticed in 
the recent PR - #3367. it:
    
    * simplifies that class by DRYing it up through a companion helper class.
    * preserves method signatures for source compatibility
    * measures nanoseconds, rather than milliseconds, since that is what 
metrics.Timer trades in internally
---
 .../gobblin/runtime/api/InstrumentedSpecStore.java | 159 ++++++++++-----------
 1 file changed, 73 insertions(+), 86 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 94585a0..901ef88 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -41,139 +41,126 @@ import org.apache.gobblin.util.ConfigUtils;
  * Subclasses should implement addSpecImpl instead of addSpec and so on.
  */
 public abstract class InstrumentedSpecStore implements SpecStore {
-  private Optional<Timer> getTimer;
-  private Optional<Timer> existsTimer;
-  private Optional<Timer> deleteTimer;
-  private Optional<Timer> addTimer;
-  private Optional<Timer> updateTimer;
-  private Optional<Timer> getAllTimer;
-  private Optional<Timer> getURIsTimer;
+
+  /** Record timing for an operation, when an `Optional<Timer>.isPresent()`, 
otherwise simply perform the operation w/o timing */
+  static class OptionallyTimingInvoker {
+
+    /** `j.u.function.Supplier` variant for an operation that may @throw 
IOException: preserves method signature, including checked exceptions */
+    @FunctionalInterface
+    public interface SupplierMayThrowIO<T> {
+      public T get() throws IOException;
+    }
+
+    /** `j.u.function.Supplier` variant for an operation that may @throw 
IOException or SpecNotFoundException: preserves method signature exceptions */
+    @FunctionalInterface
+    public interface SupplierMayThrowBoth<T> {
+      public T get() throws IOException, SpecNotFoundException;
+    }
+
+    private final Optional<Timer> timer;
+
+    public OptionallyTimingInvoker(Optional<Timer> timer) {
+      this.timer = timer != null ? timer : Optional.absent();
+    }
+
+    public <T> T invokeMayThrowIO(SupplierMayThrowIO<T> f) throws IOException {
+      try {
+        return invokeMayThrowBoth(() -> f.get());
+      } catch (SpecNotFoundException e) {
+        throw new RuntimeException("IMPOSSIBLE - prohibited by static 
checking!", e);
+      }
+    }
+
+    public <T> T invokeMayThrowBoth(SupplierMayThrowBoth<T> f) throws 
IOException, SpecNotFoundException {
+      if (timer.isPresent()) {
+        final long startTimeNanos = System.nanoTime(); // ns resolution, being 
internal granularity of `metrics.Timer`
+        final T result = f.get();
+        timer.get().update(System.nanoTime() - startTimeNanos, 
TimeUnit.NANOSECONDS);
+        return result;
+      } else { // skip timing, when no `Timer`
+        return f.get();
+      }
+    }
+  }
+
+  private OptionallyTimingInvoker getTimer;
+  private OptionallyTimingInvoker existsTimer;
+  private OptionallyTimingInvoker deleteTimer;
+  private OptionallyTimingInvoker addTimer;
+  private OptionallyTimingInvoker updateTimer;
+  private OptionallyTimingInvoker getAllTimer;
+  private OptionallyTimingInvoker getSizeTimer;
+  private OptionallyTimingInvoker getURIsTimer;
   private MetricContext metricContext;
   private final boolean instrumentationEnabled;
 
   public InstrumentedSpecStore(Config config, SpecSerDe specSerDe) {
     this.instrumentationEnabled = GobblinMetrics.isEnabled(new 
State(ConfigUtils.configToProperties(config)));
     this.metricContext = Instrumented.getMetricContext(new State(), 
getClass());
-    this.getTimer = createTimer("-GET");
-    this.existsTimer = createTimer("-EXISTS");
-    this.deleteTimer = createTimer("-DELETE");
-    this.addTimer = createTimer("-ADD");
-    this.updateTimer = createTimer("-UPDATE");
-    this.getAllTimer = createTimer("-GETALL");
-    this.getURIsTimer = createTimer("-GETURIS");
+    this.getTimer = createTimingInvoker("-GET");
+    this.existsTimer = createTimingInvoker("-EXISTS");
+    this.deleteTimer = createTimingInvoker("-DELETE");
+    this.addTimer = createTimingInvoker("-ADD");
+    this.updateTimer = createTimingInvoker("-UPDATE");
+    this.getAllTimer = createTimingInvoker("-GETALL");
+    this.getSizeTimer = createTimingInvoker("-GETCOUNT");
+    this.getURIsTimer = createTimingInvoker("-GETURIS");
+  }
+
+  private OptionallyTimingInvoker createTimingInvoker(String suffix) {
+    return new OptionallyTimingInvoker(createTimer(suffix));
   }
 
   private Optional<Timer> createTimer(String suffix) {
     return instrumentationEnabled
-        ? 
Optional.of(this.metricContext.timer(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,getClass().getSimpleName(),
 suffix)))
+        ? 
Optional.of(this.metricContext.timer(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 getClass().getSimpleName(), suffix)))
         : Optional.absent();
   }
 
   @Override
   public boolean exists(URI specUri) throws IOException {
-    if (!instrumentationEnabled) {
-      return existsImpl(specUri);
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      boolean ret = existsImpl(specUri);
-      Instrumented.updateTimer(this.existsTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return ret;
-    }
+    return this.existsTimer.invokeMayThrowIO(() -> existsImpl(specUri));
   }
 
   @Override
   public void addSpec(Spec spec) throws IOException {
-    if (!instrumentationEnabled) {
-      addSpecImpl(spec);
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      addSpecImpl(spec);
-      Instrumented.updateTimer(this.addTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-    }
+    this.addTimer.invokeMayThrowIO(() -> { addSpecImpl(spec); /* sadly, unable 
to infer `SupplierMayThrowIO<Void>`, thus explicitly... */ return null; });
   }
 
   @Override
   public boolean deleteSpec(URI specUri) throws IOException {
-    if (!instrumentationEnabled) {
-      return deleteSpecImpl(specUri);
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      boolean ret = deleteSpecImpl(specUri);
-      Instrumented.updateTimer(this.deleteTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return ret;
-    }
+    return this.deleteTimer.invokeMayThrowIO(() -> deleteSpecImpl(specUri));
   }
 
   @Override
   public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
-    if (!instrumentationEnabled) {
-      return getSpecImpl(specUri);
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      Spec spec = getSpecImpl(specUri);
-      Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return spec;
-    }
+    return this.getTimer.invokeMayThrowBoth(() -> getSpecImpl(specUri));
   }
 
   @Override
   public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) throws 
IOException {
-    if (!instrumentationEnabled) {
-      return getSpecsImpl(specSearchObject);
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      Collection<Spec> specs = getSpecsImpl(specSearchObject);
-      Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return specs;
-    }
+    // NOTE: uses same `getTimer` as `getSpec`; TODO: explore separating, 
since measuring different operation
+    return this.getTimer.invokeMayThrowIO(() -> 
getSpecsImpl(specSearchObject));
   }
 
   @Override
   public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
-    if (!instrumentationEnabled) {
-      return updateSpecImpl(spec);
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      Spec ret = updateSpecImpl(spec);
-      Instrumented.updateTimer(this.updateTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return ret;
-    }
+    return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec));
   }
 
   @Override
   public Collection<Spec> getSpecs() throws IOException {
-    if (!instrumentationEnabled) {
-      return getSpecsImpl();
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      Collection<Spec> spec = getSpecsImpl();
-      Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return spec;
-    }
+    return this.getAllTimer.invokeMayThrowIO(() -> getSpecsImpl());
   }
 
   @Override
   public Iterator<URI> getSpecURIs() throws IOException {
-    if (!instrumentationEnabled) {
-      return getSpecURIsImpl();
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      Iterator<URI> specURIs = getSpecURIsImpl();
-      Instrumented.updateTimer(this.getURIsTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return specURIs;
-    }
+    return this.getURIsTimer.invokeMayThrowIO(() -> getSpecURIsImpl());
   }
 
   @Override
   public int getSize() throws IOException {
-    if (!instrumentationEnabled) {
-      return getSizeImpl();
-    } else {
-      long startTimeMillis = System.currentTimeMillis();
-      int size = getSizeImpl();
-      Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() - 
startTimeMillis, TimeUnit.MILLISECONDS);
-      return size;
-    }
+    return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
   }
 
   public abstract void addSpecImpl(Spec spec) throws IOException;

Reply via email to