[ 
https://issues.apache.org/jira/browse/METRON-728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880941#comment-15880941
 ] 

ASF GitHub Bot commented on METRON-728:
---------------------------------------

Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/463#discussion_r102776136
  
    --- Diff: 
metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
 ---
    @@ -97,88 +110,73 @@ public void testSequentialStreamLargeBatch() throws 
FileNotFoundException {
           Map<String, Integer> count =
                   stream.map(s -> s.trim())
                           .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum));
    -      Assert.assertEquals(5, count.size());
    -      Assert.assertEquals(3, (int) count.get("foo"));
    -      Assert.assertEquals(2, (int) count.get("bar"));
    -      Assert.assertEquals(1, (int) count.get("and"));
    -      Assert.assertEquals(1, (int) count.get("the"));
    +      validateMapCount(count);
         }
       }
     
    -  @Test
    -  public void testActuallyParallel() throws ExecutionException, 
InterruptedException, FileNotFoundException {
    -    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 
batches, so at most min(5, 2) = 2 threads will be used
    -    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 
2)) {
    -      ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    -      forkJoinPool.submit(() -> {
    -                Map<String, Integer> threads =
    -                        stream.parallel().map(s -> 
Thread.currentThread().getName())
    -                                .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum));
    -                Assert.assertTrue(threads.size() <= 2);
    -              }
    -      ).get();
    -    }
    -  }
    +  private int getNumberOfBatches(final ReaderSpliterator spliterator) 
throws ExecutionException, InterruptedException {
    +    final AtomicInteger numSplits = new AtomicInteger(0);
    +    //we want to wrap the spliterator and count the (valid) splits
    +    Spliterator<String> delegatingSpliterator = new Spliterator<String>() {
    +      @Override
    +      public boolean tryAdvance(Consumer<? super String> action) {
    +        return spliterator.tryAdvance(action);
    +      }
     
    -  @Test
    -  public void testActuallyParallel_mediumBatch() throws 
ExecutionException, InterruptedException, FileNotFoundException {
    -    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 
batches, so at most 5 threads of the pool of 10 will be used
    -    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 
2)) {
    -      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
    -      forkJoinPool.submit(() -> {
    -                Map<String, Integer> threads =
    -                        stream.parallel().map(s -> 
Thread.currentThread().getName())
    -                                .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum));
    -                Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 
2) && threads.size() > 1);
    -              }
    -      ).get();
    -    }
    +      @Override
    +      public Spliterator<String> trySplit() {
    +        Spliterator<String> ret = spliterator.trySplit();
    +        if(ret != null) {
    +          numSplits.incrementAndGet();
    +        }
    +        return ret;
    +      }
    +
    +      @Override
    +      public long estimateSize() {
    +        return spliterator.estimateSize();
    +      }
    +
    +      @Override
    +      public int characteristics() {
    +        return spliterator.characteristics();
    +      }
    +    };
    +
    +    Stream<String> stream = StreamSupport.stream(delegatingSpliterator, 
true);
    +
    +    //now run it in a parallel pool and do some calculation that doesn't 
really matter.
    +    ForkJoinPool forkJoinPool = new ForkJoinPool(10);
    --- End diff --
    
    Incredibly minor point, but since we no longer care about the actual 
execution and aren't running it a lot, it seems appropriate to just use 
ForkJoinPool.commonPool(), and drop the shutdown line.
    
    This is entirely up to you if you want to change, I don't consider it 
blocking by any means.


> ReaderSpliteratorTest fails randomly and extremely rarely
> ---------------------------------------------------------
>
>                 Key: METRON-728
>                 URL: https://issues.apache.org/jira/browse/METRON-728
>             Project: Metron
>          Issue Type: Bug
>    Affects Versions: 0.3.1
>            Reporter: Justin Leet
>            Assignee: Justin Leet
>
> See logs at
> https://travis-ci.org/justinleet/incubator-metron/builds/203298348
> I was able to reproduce this locally by calling 
> {{testActuallyParallel_mediumBatch}} in a {{while(true)}} loop. It can also 
> occur in {{testActuallyParallel_mediumBatchImplicitlyParallel()}}.  I also 
> had to add 
> {{forkJoinPool.shutdownNow();}} to the end of the test, because otherwise OOM 
> errors occur.
> My current assumption is that there's no guarantee you ever actually end up 
> running in parallel, so in extremely rare cases you just end up running one 
> thread.
> I've had it vary wildly when I hit it, from within a second or two to running 
> for over a minute before an assertion failure occurs.
> We could just alter the assertion to be 
> {code}Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && 
> threads.size() >= 1);{code}
> This defeats the purpose of testing the parallelism a bit, but if there's no 
> guarantee we actually get parallelism there's not a fantastic way to test it. 
>  Given the extreme rarity, we might want to just live with the fact that 
> occasionally {{threads.size() >= 1}} hits the threads.size() == 1 case on the 
> two tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to