homatthew commented on code in PR #3539:
URL: https://github.com/apache/gobblin/pull/3539#discussion_r957740785


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java:
##########
@@ -16,28 +16,68 @@
  */
 package org.apache.gobblin.runtime.messaging;
 
-import java.time.Duration;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Abstraction for receiving {@link DynamicWorkUnitMessage} from {@link 
DynamicWorkUnitProducer}.
  * The class is responsible for fetching the messages from the messaging 
service. All business logic
  * should be done in the {@link DynamicWorkUnitMessage.Handler}.<br><br>
  *
- * For polling implementations (e.g. HDFS or Kafka), you can use the
- * {@link DynamicWorkUnitUtils#runInBackground(Runnable, Duration)} to call the
+ * This consumer can be used to poll a message buffer (e.g. HDFS or Kafka) 
using
+ * {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, 
TimeUnit)} to call the
  * {@link Runnable#run()} method periodically in a background thread <br><br>
  *
- * Push based implementations (e.g. helix or zk) can omit using this method 
and instead setup the callback methods
- * without spawning a background thread
+ * Each new {@link DynamicWorkUnitMessage} is passed to a {@link 
DynamicWorkUnitMessage.Handler}
+ * and will call {@link 
DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)}
  */
-public abstract class DynamicWorkUnitConsumer implements Runnable {
-  protected List<DynamicWorkUnitMessage.Handler> handlers = new ArrayList<>();
+public class DynamicWorkUnitConsumer implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+  protected MessageBuffer<DynamicWorkUnitMessage> buffer;
+  protected List<DynamicWorkUnitMessage.Handler> handlers;
+
+  public DynamicWorkUnitConsumer(
+      MessageBuffer<DynamicWorkUnitMessage> buffer,
+      Collection<DynamicWorkUnitMessage.Handler> handlers) {
+    this.buffer = buffer;
+    for(DynamicWorkUnitMessage.Handler handler : handlers) {
+      handlers.add(handler);
+    }
+  }
+
+  /**
+   * Fetches all unread messages from sent by {@link DynamicWorkUnitProducer} 
and
+   * calls {@link 
DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} method for each 
handler added via
+   * {@link DynamicWorkUnitConsumer#DynamicWorkUnitConsumer(MessageBuffer, 
Collection)} or
+   * {@link DynamicWorkUnitConsumer#addHandler(DynamicWorkUnitMessage.Handler)}
+   */
+  public void run() {
+    List<DynamicWorkUnitMessage> messages = getMessages(this.buffer);
+    for (DynamicWorkUnitMessage msg : messages) {
+      handleMessage(msg);
+    }
+  }
+
+  protected static List<DynamicWorkUnitMessage> 
getMessages(MessageBuffer<DynamicWorkUnitMessage> buffer) {
+    try {
+      LOG.debug("Fetching {} from the file buffer, {}",
+          DynamicWorkUnitMessage.class.getSimpleName(),
+          buffer.getClass().getSimpleName());
+      return buffer.get();
+    } catch (IOException e) {
+      throw new RuntimeException("Encountered exception while getting messages 
from the message buffer", e);

Review Comment:
   Not sure if the application should explode loudly when there is an 
ioexception. This feels a little brittle since the dynamic workunit is an 
"optional" feature, but I can imagine it being difficult to uncover this issue 
if we do not do this.
   
   Gobblin logs are very noisy and full of red herring exceptions (exceptions 
that are "okay"). It may be hard to RC unless it explodes loudly or the person 
debugging is specifically looking for this log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to