[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633118#comment-15633118
 ] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r86312565
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
    @@ -0,0 +1,494 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators.async;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +/**
    + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
    + * and emit results from {@link AsyncCollector} to the next operators 
following it by
    + * calling {@link Output#collect(Object)}
    + */
    +@Internal
    +public class AsyncCollectorBuffer<IN, OUT> {
    +   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
    +
    +   /**
    +    * Max number of {@link AsyncCollector} in the buffer.
    +    */
    +   private final int bufferSize;
    +
    +   private final AsyncDataStream.OutputMode mode;
    +
    +   private final AsyncWaitOperator<IN, OUT> operator;
    +
    +   /**
    +    * {@link AsyncCollector} queue.
    +    */
    +   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
    +   /**
    +    * A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
    +    */
    +   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
    +   /**
    +    * A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
    +    */
    +   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
    +
    +   private final LinkedList<AsyncCollector> finishedCollectors = new 
LinkedList<>();
    +
    +   /**
    +    * {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
    +    */
    +   private TimestampedCollector<OUT> timestampedCollector;
    +   private Output<StreamRecord<OUT>> output;
    +
    +   /**
    +    * Locks and conditions to synchronize with main thread and emitter 
thread.
    +    */
    +   private final Lock lock;
    +   private final Condition notFull;
    +   private final Condition taskDone;
    +   private final Condition isEmpty;
    +
    +   /**
    +    * Error from user codes.
    +    */
    +   private volatile Exception error;
    +
    +   private final Emitter emitter;
    +   private final Thread emitThread;
    +
    +   private boolean isCheckpointing;
    +
    +   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
    +           Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
    +
    +           this.bufferSize = maxSize;
    +           this.mode = mode;
    +           this.operator = operator;
    +
    +           this.lock = new ReentrantLock(true);
    +           this.notFull = this.lock.newCondition();
    +           this.taskDone = this.lock.newCondition();
    +           this.isEmpty = this.lock.newCondition();
    +
    +           this.emitter = new Emitter();
    +           this.emitThread = new Thread(emitter);
    +   }
    +
    +   /**
    +    * Add an {@link StreamRecord} into the buffer. A new {@link 
AsyncCollector} will be created and returned
    +    * corresponding to the input StreamRecord.
    +    * <p>
    +    * If buffer is full, caller will wait until new space is available.
    +    *
    +    * @param record StreamRecord
    +    * @return An AsyncCollector
    +    * @throws Exception InterruptedException or exceptions from 
AsyncCollector.
    +    */
    +   public AsyncCollector<IN, OUT> add(StreamRecord<IN> record) throws 
Exception {
    +           try {
    +                   lock.lock();
    +
    +                   notifyCheckpointDone();
    +
    +                   while (queue.size() >= bufferSize) {
    +                           notFull.await();
    +                   }
    +
    +                   // propagate error to the main thread
    +                   if (error != null) {
    +                           throw error;
    +                   }
    +
    +                   AsyncCollector<IN, OUT> collector = new 
AsyncCollector(this);
    +
    +                   collectorToQueue.put(collector, queue.add(collector));
    +                   collectorToStreamElement.put(collector, record);
    +
    +                   return collector;
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   /**
    +    * Add a {@link Watermark} into queue. A new AsyncCollector will be 
created and returned.
    +    * <p>
    +    * If queue is full, caller will be blocked here.
    +    *
    +    * @param watermark Watermark
    +    * @return AsyncCollector
    +    * @throws Exception Exceptions from async operation.
    +    */
    +   public AsyncCollector<IN, OUT> add(Watermark watermark) throws 
Exception {
    +           return processMark(watermark);
    +   }
    +
    +   /**
    +    * Add a {@link LatencyMarker} into queue. A new AsyncCollector will be 
created and returned.
    +    * <p>
    +    * If queue is full, caller will be blocked here.
    +    *
    +    * @param latencyMarker LatencyMarker
    +    * @return AsyncCollector
    +    * @throws Exception Exceptions from async operation.
    +    */
    +   public AsyncCollector<IN, OUT> add(LatencyMarker latencyMarker) throws 
Exception {
    +           return processMark(latencyMarker);
    +   }
    +
    +   private AsyncCollector<IN, OUT> processMark(StreamElement mark) throws 
Exception {
    +           try {
    +                   lock.lock();
    +
    +                   notifyCheckpointDone();
    +
    +                   while (queue.size() >= bufferSize)
    +                           notFull.await();
    +
    +                   if (error != null) {
    +                           throw error;
    +                   }
    +
    +                   AsyncCollector<IN, OUT> collector = new 
AsyncCollector(this, true);
    +
    +                   collectorToQueue.put(collector, queue.add(collector));
    +                   collectorToStreamElement.put(collector, mark);
    +
    +                   // signal emitter thread that current collector is ready
    +                   mark(collector);
    +
    +                   return collector;
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   /**
    +    * Notify the Emitter Thread that an AsyncCollector has completed.
    +    *
    +    * @param collector Completed AsyncCollector
    +    */
    +   void mark(AsyncCollector<IN, OUT> collector) {
    +           try {
    +                   lock.lock();
    +
    +                   if (mode == AsyncDataStream.OutputMode.UNORDERED) {
    +                           finishedCollectors.add(collector);
    +                   }
    +
    +                   taskDone.signal();
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   /**
    +    * Caller will wait here if buffer is not empty, meaning that not all 
async i/o tasks have returned yet.
    +    *
    +    * @throws Exception InterruptedException or Exceptions from 
AsyncCollector.
    +    */
    +   void waitEmpty() throws Exception {
    +           try {
    +                   lock.lock();
    +
    +                   notifyCheckpointDone();
    +
    +                   while (queue.size() != 0)
    +                           isEmpty.await();
    +
    +                   if (error != null) {
    +                           throw error;
    +                   }
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   public void startEmitterThread() {
    +           this.emitThread.start();
    +   }
    +
    +   public void stopEmitterThread() {
    +           emitter.stop();
    +
    +           emitThread.interrupt();
    +   }
    +
    +   /**
    +    * Get all StreamElements in the AsyncCollector queue.
    +    * <p>
    +    * Emitter Thread can not output records and will wait for a while due 
to isCheckpointing flag
    +    * until doing checkpoint has done.
    +    *
    +    * @return A List containing StreamElements.
    +    */
    +   public List<StreamElement> getStreamElementsInBuffer() {
    +           try {
    +                   lock.lock();
    +
    +                   // stop emitter thread
    +                   isCheckpointing = true;
    +
    +                   List<StreamElement> ret = new ArrayList<>();
    +                   for (int i = 0; i < queue.size(); ++i) {
    +                           AsyncCollector<IN, OUT> collector = 
queue.get(i);
    +                           
ret.add(collectorToStreamElement.get(collector));
    +                   }
    +
    +                   return ret;
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   public void setOutput(TimestampedCollector<OUT> collector, 
Output<StreamRecord<OUT>> output) {
    +           this.timestampedCollector = collector;
    +           this.output = output;
    +   }
    +
    +   public void notifyCheckpointDone() {
    +           this.isCheckpointing = false;
    +           this.taskDone.signalAll();
    +   }
    +
    +   /**
    +    * A working thread to output results from {@link AsyncCollector} to 
the next operator.
    +    */
    +   private class Emitter implements Runnable {
    +           private volatile boolean running = true;
    +
    +           private void output(AsyncCollector collector) throws Exception {
    +                   List<OUT> result = collector.getResult();
    +
    +                   // update timestamp for output stream records based on 
the input stream record.
    +                   StreamElement element = 
collectorToStreamElement.get(collector);
    +                   if (element == null) {
    +                           throw new Exception("No input stream record or 
watermark for current AsyncCollector: "+collector);
    +                   }
    +
    +                   if (element.isRecord()) {
    +                           if (result == null) {
    --- End diff --
    
    Shouldn't we check this in the `AsyncCollector`? We could then either fail 
there or store the exception as the result of the `AsyncCollector`.


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to