johnyangk commented on a change in pull request #130: [NEMO-233] Emit watermark 
at unbounded source 
URL: https://github.com/apache/incubator-nemo/pull/130#discussion_r228858469
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
 ##########
 @@ -93,90 +97,79 @@ public ObjectNode getPropertiesAsJsonNode() {
    * @param <M> checkpoint mark type.
    */
   private static final class UnboundedSourceReadable<O, M extends 
UnboundedSource.CheckpointMark>
-      implements Readable<WindowedValue<O>> {
+      implements Readable<Object> {
     private final UnboundedSource<O, M> unboundedSource;
+    private UnboundedSource.UnboundedReader<O> reader;
+    private Function<O, WindowedValue<O>> windowedValueConverter;
+    private boolean finished = false;
 
     UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
       this.unboundedSource = unboundedSource;
     }
 
     @Override
-    public Iterable<WindowedValue<O>> read() throws IOException {
-      return new UnboundedSourceIterable<>(unboundedSource);
-    }
+    public void prepare() {
+      try {
+        reader = unboundedSource.createReader(null, null);
+        reader.start();
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    @Override
-    public List<String> getLocations() throws Exception {
-      return new ArrayList<>();
+      // get first element
+      final O firstElement = retrieveFirstElement();
+      if (firstElement instanceof WindowedValue) {
+        windowedValueConverter = val -> (WindowedValue) val;
+      } else {
+        windowedValueConverter = WindowedValue::valueInGlobalWindow;
+      }
     }
-  }
-
-  /**
-   * The iterable class for unbounded sources.
-   * @param <O> output type.
-   * @param <M> checkpoint mark type.
-   */
-  private static final class UnboundedSourceIterable<O, M extends 
UnboundedSource.CheckpointMark>
-      implements Iterable<WindowedValue<O>> {
 
-    private UnboundedSourceIterator<O, M> iterator;
-
-    UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource) 
throws IOException {
-      this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+    private O retrieveFirstElement() {
+      while (true) {
+        try {
+          return reader.getCurrent();
+        } catch (final NoSuchElementException e) {
+          // the first element is not currently available... retry
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e1) {
+            e1.printStackTrace();
 
 Review comment:
   throw new RuntimeException

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to