johnyangk commented on a change in pull request #130: [NEMO-233] Emit watermark 
at unbounded source 
URL: https://github.com/apache/incubator-nemo/pull/130#discussion_r228863064
 
 

 ##########
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 ##########
 @@ -20,49 +20,92 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Fetches data from a data source.
  */
 class SourceVertexDataFetcher extends DataFetcher {
   private final Readable readable;
-
-  // Non-finals (lazy fetching)
-  private Iterator iterator;
   private long boundedSourceReadTime = 0;
+  private static final long WATERMARK_PERIOD = 1000; // ms
+  private final ScheduledExecutorService watermarkTriggerService;
+  private boolean watermarkTriggered = false;
+  private final boolean bounded;
 
-  SourceVertexDataFetcher(final IRVertex dataSource,
+  SourceVertexDataFetcher(final SourceVertex dataSource,
                           final Readable readable,
                           final OutputCollector outputCollector) {
     super(dataSource, outputCollector);
     this.readable = readable;
+    this.readable.prepare();
+    this.bounded = dataSource.isBounded();
+
+    if (!bounded) {
+      this.watermarkTriggerService = Executors.newScheduledThreadPool(1);
+      this.watermarkTriggerService.scheduleAtFixedRate(() -> {
+        watermarkTriggered = true;
+      }, WATERMARK_PERIOD, WATERMARK_PERIOD, TimeUnit.MILLISECONDS);
+    } else {
+      this.watermarkTriggerService = null;
+    }
   }
 
+  /**
+   * This is non-blocking operation.
+   * @return current data
+   * @throws NoSuchElementException if the current data is not available
+   */
   @Override
-  Object fetchDataElement() throws IOException {
-    if (iterator == null) {
-      fetchDataLazily();
+  Object fetchDataElement() throws NoSuchElementException, IOException {
+    if (readable.isFinished()) {
+      return Finishmark.getInstance();
+    } else {
+      final long start = System.currentTimeMillis();
+      final Object element = retrieveElement();
+      boundedSourceReadTime += System.currentTimeMillis() - start;
+      return element;
     }
+  }
 
-    if (iterator.hasNext()) {
-      return iterator.next();
-    } else {
-      throw new NoSuchElementException();
+  final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
+
+  @Override
+  public void close() throws Exception {
+    readable.close();
+    if (watermarkTriggerService != null) {
+      watermarkTriggerService.shutdown();
     }
   }
 
-  private void fetchDataLazily() throws IOException {
-    final long start = System.currentTimeMillis();
-    iterator = this.readable.read().iterator();
-    boundedSourceReadTime += System.currentTimeMillis() - start;
+  private boolean isWatermarkTriggerTime() {
+    if (watermarkTriggered) {
+      watermarkTriggered = false;
+      return true;
+    } else {
+      return false;
+    }
   }
 
-  final long getBoundedSourceReadTime() {
-    return boundedSourceReadTime;
+  private Object retrieveElement() throws IOException {
 
 Review comment:
   NoSuchElementException

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to