homatthew commented on code in PR #3539:
URL: https://github.com/apache/gobblin/pull/3539#discussion_r957740785
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java:
##########
@@ -16,28 +16,68 @@
*/
package org.apache.gobblin.runtime.messaging;
-import java.time.Duration;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstraction for receiving {@link DynamicWorkUnitMessage} from {@link
DynamicWorkUnitProducer}.
* The class is responsible for fetching the messages from the messaging
service. All business logic
* should be done in the {@link DynamicWorkUnitMessage.Handler}.<br><br>
*
- * For polling implementations (e.g. HDFS or Kafka), you can use the
- * {@link DynamicWorkUnitUtils#runInBackground(Runnable, Duration)} to call the
+ * This consumer can be used to poll a message buffer (e.g. HDFS or Kafka)
using
+ * {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long,
TimeUnit)} to call the
* {@link Runnable#run()} method periodically in a background thread <br><br>
*
- * Push based implementations (e.g. helix or zk) can omit using this method
and instead setup the callback methods
- * without spawning a background thread
+ * Each new {@link DynamicWorkUnitMessage} is passed to a {@link
DynamicWorkUnitMessage.Handler}
+ * and will call {@link
DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)}
*/
-public abstract class DynamicWorkUnitConsumer implements Runnable {
- protected List<DynamicWorkUnitMessage.Handler> handlers = new ArrayList<>();
+public class DynamicWorkUnitConsumer implements Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+ protected MessageBuffer<DynamicWorkUnitMessage> buffer;
+ protected List<DynamicWorkUnitMessage.Handler> handlers;
+
+ public DynamicWorkUnitConsumer(
+ MessageBuffer<DynamicWorkUnitMessage> buffer,
+ Collection<DynamicWorkUnitMessage.Handler> handlers) {
+ this.buffer = buffer;
+ for(DynamicWorkUnitMessage.Handler handler : handlers) {
+ handlers.add(handler);
+ }
+ }
+
+ /**
+ * Fetches all unread messages from sent by {@link DynamicWorkUnitProducer}
and
+ * calls {@link
DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} method for each
handler added via
+ * {@link DynamicWorkUnitConsumer#DynamicWorkUnitConsumer(MessageBuffer,
Collection)} or
+ * {@link DynamicWorkUnitConsumer#addHandler(DynamicWorkUnitMessage.Handler)}
+ */
+ public void run() {
+ List<DynamicWorkUnitMessage> messages = getMessages(this.buffer);
+ for (DynamicWorkUnitMessage msg : messages) {
+ handleMessage(msg);
+ }
+ }
+
+ protected static List<DynamicWorkUnitMessage>
getMessages(MessageBuffer<DynamicWorkUnitMessage> buffer) {
+ try {
+ LOG.debug("Fetching {} from the file buffer, {}",
+ DynamicWorkUnitMessage.class.getSimpleName(),
+ buffer.getClass().getSimpleName());
+ return buffer.get();
+ } catch (IOException e) {
+ throw new RuntimeException("Encountered exception while getting messages
from the message buffer", e);
Review Comment:
Not sure if the application should explode loudly when there is an
ioexception. This feels a little brittle since the dynamic workunit is an
"optional" feature, but I can imagine it being difficult to uncover this issue
if we do not do this.
Gobblin logs are very noisy and full of red herring exceptions (exceptions
that are "okay"). It may be hard to RC unless it explodes loudly or the person
debugging is specifically looking for this log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]