johnyangk commented on a change in pull request #130: [NEMO-233] Emit watermark
at unbounded source
URL: https://github.com/apache/incubator-nemo/pull/130#discussion_r228858469
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
##########
@@ -93,90 +97,79 @@ public ObjectNode getPropertiesAsJsonNode() {
* @param <M> checkpoint mark type.
*/
private static final class UnboundedSourceReadable<O, M extends
UnboundedSource.CheckpointMark>
- implements Readable<WindowedValue<O>> {
+ implements Readable<Object> {
private final UnboundedSource<O, M> unboundedSource;
+ private UnboundedSource.UnboundedReader<O> reader;
+ private Function<O, WindowedValue<O>> windowedValueConverter;
+ private boolean finished = false;
UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
this.unboundedSource = unboundedSource;
}
@Override
- public Iterable<WindowedValue<O>> read() throws IOException {
- return new UnboundedSourceIterable<>(unboundedSource);
- }
+ public void prepare() {
+ try {
+ reader = unboundedSource.createReader(null, null);
+ reader.start();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
- @Override
- public List<String> getLocations() throws Exception {
- return new ArrayList<>();
+ // get first element
+ final O firstElement = retrieveFirstElement();
+ if (firstElement instanceof WindowedValue) {
+ windowedValueConverter = val -> (WindowedValue) val;
+ } else {
+ windowedValueConverter = WindowedValue::valueInGlobalWindow;
+ }
}
- }
-
- /**
- * The iterable class for unbounded sources.
- * @param <O> output type.
- * @param <M> checkpoint mark type.
- */
- private static final class UnboundedSourceIterable<O, M extends
UnboundedSource.CheckpointMark>
- implements Iterable<WindowedValue<O>> {
- private UnboundedSourceIterator<O, M> iterator;
-
- UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource)
throws IOException {
- this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+ private O retrieveFirstElement() {
+ while (true) {
+ try {
+ return reader.getCurrent();
+ } catch (final NoSuchElementException e) {
+ // the first element is not currently available... retry
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
Review comment:
throw new RuntimeException
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services