Repository: metron
Updated Branches:
  refs/heads/master a8b555dcc -> ed50d48bb


METRON-1518 Build Failure When Using Profile HDP-2.5.0.0 (nickwallen) closes 
apache/metron#986


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ed50d48b
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ed50d48b
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ed50d48b

Branch: refs/heads/master
Commit: ed50d48bb47cf3f301884f6e18fe4efc8c1b91f1
Parents: a8b555d
Author: nickwallen <n...@nickallen.org>
Authored: Tue Apr 10 17:16:20 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Tue Apr 10 17:16:20 2018 -0400

----------------------------------------------------------------------
 .../profiler/bolt/ProfileBuilderBolt.java       | 51 +++++---------------
 1 file changed, 11 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/ed50d48b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index fb3d2d0..ca02b58 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -42,13 +42,11 @@ import 
org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.zookeeper.SimpleEventListener;
 import org.apache.metron.zookeeper.ZKCache;
-import org.apache.storm.StormTimer;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.windowing.TupleWindow;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -59,9 +57,9 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
 
 import static java.lang.String.format;
 import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
@@ -155,8 +153,8 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
   private FlushSignal activeFlushSignal;
 
   /**
-   * A timer that flushes expired profiles on a regular interval. The expired 
profiles
-   * are flushed on a separate thread.
+   * An executor that flushes expired profiles at a regular interval on a 
separate
+   * thread.
    *
    * <p>Flushing expired profiles ensures that any profiles that stop 
receiving messages
    * for an extended period of time will continue to be flushed.
@@ -164,7 +162,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
    * <p>This introduces concurrency issues as the bolt is no longer single 
threaded. Due
    * to this, all access to the {@code MessageDistributor} needs to be 
protected.
    */
-  private StormTimer expiredFlushTimer;
+  private transient ScheduledExecutorService flushExpiredExecutor;
 
   public ProfileBuilderBolt() {
     this.emitters = new ArrayList<>();
@@ -202,7 +200,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
     this.configurations = new ProfilerConfigurations();
     this.activeFlushSignal = new 
FixedFrequencyFlushSignal(periodDurationMillis);
     setupZookeeper();
-    startExpiredFlushTimer();
+    startFlushingExpiredProfiles();
   }
 
   @Override
@@ -210,7 +208,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
     try {
       zookeeperCache.close();
       zookeeperClient.close();
-      expiredFlushTimer.close();
+      flushExpiredExecutor.shutdown();
 
     } catch(Throwable e) {
       LOG.error("Exception when cleaning up", e);
@@ -421,39 +419,12 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
   }
 
   /**
-   * Converts milliseconds to seconds and handles an ugly cast.
-   *
-   * @param millis Duration in milliseconds.
-   * @return Duration in seconds.
-   */
-  private int toSeconds(long millis) {
-    return (int) TimeUnit.MILLISECONDS.toSeconds(millis);
-  }
-
-  /**
-   * Creates a timer that regularly flushes expired profiles on a separate 
thread.
-   */
-  private void startExpiredFlushTimer() {
-
-    expiredFlushTimer = createTimer("flush-expired-profiles-timer");
-    expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), 
() -> flushExpired());
-  }
-
-  /**
-   * Creates a timer that can execute a task on a fixed interval.
-   *
-   * <p>If the timer encounters an exception, the entire process will be 
killed.
-   *
-   * @param name The name of the timer.
-   * @return The timer.
+   * Creates a separate thread that regularly flushes expired profiles.
    */
-  private StormTimer createTimer(String name) {
+  private void startFlushingExpiredProfiles() {
 
-    return new StormTimer(name, (thread, exception) -> {
-      String msg = String.format("Unexpected exception in timer task; 
timer=%s", name);
-      LOG.error(msg, exception);
-      Utils.exitProcess(1, msg);
-    });
+    flushExpiredExecutor = Executors.newSingleThreadScheduledExecutor();
+    flushExpiredExecutor.scheduleAtFixedRate(() -> flushExpired(), 0, 
profileTimeToLiveMillis, TimeUnit.MILLISECONDS);
   }
 
   @Override

Reply via email to