taegeonum commented on a change in pull request #130: [NEMO-233] Emit watermark
at unbounded source
URL: https://github.com/apache/incubator-nemo/pull/130#discussion_r228879918
##########
File path:
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -303,46 +310,143 @@ private void finalizeVertex(final VertexHarness
vertexHarness) {
}
/**
+ * Process an element generated from the dataFetcher.
+ * If the element is an instance of Finishmark, we remove the dataFetcher
from the current list.
+ * @param element element
+ * @param dataFetcher current data fetcher
+ * @param dataFetchers current list
+ */
+ private void handleElement(final Object element,
+ final DataFetcher dataFetcher,
+ final List<DataFetcher> dataFetchers) {
+ if (element instanceof Finishmark) {
+ // We've consumed all the data from this data fetcher.
+ if (dataFetcher instanceof SourceVertexDataFetcher) {
+ boundedSourceReadTime += ((SourceVertexDataFetcher)
dataFetcher).getBoundedSourceReadTime();
+ } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+ serializedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getSerializedBytes();
+ encodedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getEncodedBytes();
+ }
+
+ // remove current data fetcher from the list
+ dataFetchers.remove(dataFetcher);
+ } else if (element instanceof Watermark) {
+ // Watermark
+ processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+ } else {
+ // Process data element
+ processElement(dataFetcher.getOutputCollector(), element);
+ }
+ }
+
+ /**
+ * Check if it is time to poll pending fetchers' data.
+ * @param pollingPeriod polling period
+ * @param currentTime current time
+ * @param prevTime prev time
+ */
+ private boolean isPollingTime(final long pollingPeriod,
+ final long currentTime,
+ final long prevTime) {
+ return (currentTime - prevTime) >= pollingPeriod;
+ }
+
+ /**
+ * This retrieves data from data fetchers and process them.
+ * It maintains two lists:
+ * -- availableFetchers: maintain data fetchers that currently have data
elements to retreive
+ * -- pendingFetchers: maintain data fetchers that currently do not have
available elements.
+ * This can become available in the future, and therefore we check the
pending fetchers every pollingInterval.
+ *
+ * If a data fetcher finishes, we remove it from the two lists.
+ * If a data fetcher has no available element, we move the data fetcher to
pendingFetchers
+ * If a pending data fetcher has element, we move it to availableFetchers
+ * If there are no available fetchers but pending fetchers, sleep for
pollingPeriod
+ * and retry fetching data from the pendingFetchers.
+ *
* @param fetchers to handle.
* @return false if IOException.
*/
private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
- final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers);
- while (!availableFetchers.isEmpty()) { // empty means we've consumed all
task-external input data
- // For this looping of available fetchers.
- int finishedFetcherIndex = NONE_FINISHED;
- for (int i = 0; i < availableFetchers.size(); i++) {
- final DataFetcher dataFetcher = availableFetchers.get(i);
- final Object element;
+ final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers);
+ final List<DataFetcher> pendingFetchers = new LinkedList<>();
+
+ // Polling interval.
+ final long pollingInterval = 100; // ms
+
+ // Previous polling time
+ long prevPollingTime = System.currentTimeMillis();
+
+ // empty means we've consumed all task-external input data
+ while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) {
+ // We first fetch data from available data fetchers
+ final Iterator<DataFetcher> availableIterator =
availableFetchers.iterator();
+
+ while (availableIterator.hasNext()) {
+ final DataFetcher dataFetcher = availableIterator.next();
try {
- element = dataFetcher.fetchDataElement();
- } catch (NoSuchElementException e) {
- // We've consumed all the data from this data fetcher.
- if (dataFetcher instanceof SourceVertexDataFetcher) {
- boundedSourceReadTime += ((SourceVertexDataFetcher)
dataFetcher).getBoundedSourceReadTime();
- } else if (dataFetcher instanceof ParentTaskDataFetcher) {
- serializedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getSerializedBytes();
- encodedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getEncodedBytes();
- }
- finishedFetcherIndex = i;
- break;
- } catch (IOException e) {
+ handleElement(dataFetcher.fetchDataElement(), dataFetcher,
availableFetchers);
+ } catch (final NoSuchElementException e) {
+ // No element in current data fetcher, fetch data from next fetcher
+ // move current data fetcher to pending.
+ availableIterator.remove();
+ pendingFetchers.add(dataFetcher);
+ } catch (final IOException e) {
// IOException means that this task should be retried.
taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
Optional.empty(),
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
LOG.error("{} Execution Failed (Recoverable: input read failure)!
Exception: {}", taskId, e);
return false;
}
+ }
+
+ final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
+ final long currentTime = System.currentTimeMillis();
+ // We check pending data every polling interval
+ while (pendingIterator.hasNext()
+ && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+ prevPollingTime = currentTime;
Review comment:
currentTime is updated at line 404.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services