[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15306278#comment-15306278
 ] 

ASF GitHub Bot commented on FLINK-3477:
---------------------------------------

Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1517#discussion_r65027557
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
    @@ -42,34 +44,38 @@
      * Combine operator for Reduce functions, standalone (not chained).
      * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
      * partially aggregated groups.
    - * 
    + *
      * @param <T> The data type consumed and produced by the combiner.
      */
     public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, 
T> {
    -   
    +
        private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
     
        /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
        private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    -   
    -   
    +
    +
        private TaskContext<ReduceFunction<T>, T> taskContext;
     
        private TypeSerializer<T> serializer;
     
        private TypeComparator<T> comparator;
    -   
    +
        private ReduceFunction<T> reducer;
    -   
    +
        private Collector<T> output;
    -   
    +
    +   private DriverStrategy strategy;
    +
        private InMemorySorter<T> sorter;
    -   
    +
        private QuickSort sortAlgo = new QuickSort();
     
    +   private ReduceHashTable<T> table;
    +
        private List<MemorySegment> memory;
     
    -   private boolean running;
    +   private volatile boolean canceled;
    --- End diff --
    
    I think it's better to have `volatile` here. This variable will be set from 
a different thread, and `volatile` is not only for atomicity, but also for 
memory consistency (seeing the effect of a write in an other thread). If a 
variable is not volatile then the compiler may assume in certain cases that it 
is only modified and read by one thread (17.4.2-5. in [1]). (Also see [2].)
    
    Note: omitting the volatile probably wouldn't cause any actual bug here, 
because the loop bodies are large so the compiler probably won't inline and 
analyze the entire call tree to look for writes to this flag, but I wouldn't 
risk it. Also, I don't know how common it is in Java that this stuff causes 
actual problems, but it actually happened to me in C++ once that a loop like 
this was effectively turned into `while(true)` by the compiler, because my flag 
was not volatile. It was a nasty debugging session. (Another problematic thing 
that the compiler is allowed to do with non-volatile variables is to cache the 
value of the variable in a register, and not read it from memory if it can turn 
all possible writes to it by the current thread into a write to the register.)
    
    [1] http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.3
    [2] 
http://stackoverflow.com/questions/106591/do-you-ever-use-the-volatile-keyword-in-java


> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>
>                 Key: FLINK-3477
>                 URL: https://issues.apache.org/jira/browse/FLINK-3477
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to