[
https://issues.apache.org/jira/browse/METRON-728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880941#comment-15880941
]
ASF GitHub Bot commented on METRON-728:
---------------------------------------
Github user justinleet commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/463#discussion_r102776136
--- Diff:
metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
---
@@ -97,88 +110,73 @@ public void testSequentialStreamLargeBatch() throws
FileNotFoundException {
Map<String, Integer> count =
stream.map(s -> s.trim())
.collect(Collectors.toMap(s -> s, s -> 1,
Integer::sum));
- Assert.assertEquals(5, count.size());
- Assert.assertEquals(3, (int) count.get("foo"));
- Assert.assertEquals(2, (int) count.get("bar"));
- Assert.assertEquals(1, (int) count.get("and"));
- Assert.assertEquals(1, (int) count.get("the"));
+ validateMapCount(count);
}
}
- @Test
- public void testActuallyParallel() throws ExecutionException,
InterruptedException, FileNotFoundException {
- //With 9 elements and a batch of 2, we should only ceil(9/2) = 5
batches, so at most min(5, 2) = 2 threads will be used
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(),
2)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(2);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.parallel().map(s ->
Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1,
Integer::sum));
- Assert.assertTrue(threads.size() <= 2);
- }
- ).get();
- }
- }
+ private int getNumberOfBatches(final ReaderSpliterator spliterator)
throws ExecutionException, InterruptedException {
+ final AtomicInteger numSplits = new AtomicInteger(0);
+ //we want to wrap the spliterator and count the (valid) splits
+ Spliterator<String> delegatingSpliterator = new Spliterator<String>() {
+ @Override
+ public boolean tryAdvance(Consumer<? super String> action) {
+ return spliterator.tryAdvance(action);
+ }
- @Test
- public void testActuallyParallel_mediumBatch() throws
ExecutionException, InterruptedException, FileNotFoundException {
- //With 9 elements and a batch of 2, we should only ceil(9/2) = 5
batches, so at most 5 threads of the pool of 10 will be used
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(),
2)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(10);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.parallel().map(s ->
Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1,
Integer::sum));
- Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 /
2) && threads.size() > 1);
- }
- ).get();
- }
+ @Override
+ public Spliterator<String> trySplit() {
+ Spliterator<String> ret = spliterator.trySplit();
+ if(ret != null) {
+ numSplits.incrementAndGet();
+ }
+ return ret;
+ }
+
+ @Override
+ public long estimateSize() {
+ return spliterator.estimateSize();
+ }
+
+ @Override
+ public int characteristics() {
+ return spliterator.characteristics();
+ }
+ };
+
+ Stream<String> stream = StreamSupport.stream(delegatingSpliterator,
true);
+
+ //now run it in a parallel pool and do some calculation that doesn't
really matter.
+ ForkJoinPool forkJoinPool = new ForkJoinPool(10);
--- End diff --
Incredibly minor point, but since we no longer care about the actual
execution and aren't running it a lot, it seems appropriate to just use
ForkJoinPool.commonPool(), and drop the shutdown line.
This is entirely up to you if you want to change, I don't consider it
blocking by any means.
> ReaderSpliteratorTest fails randomly and extremely rarely
> ---------------------------------------------------------
>
> Key: METRON-728
> URL: https://issues.apache.org/jira/browse/METRON-728
> Project: Metron
> Issue Type: Bug
> Affects Versions: 0.3.1
> Reporter: Justin Leet
> Assignee: Justin Leet
>
> See logs at
> https://travis-ci.org/justinleet/incubator-metron/builds/203298348
> I was able to reproduce this locally by calling
> {{testActuallyParallel_mediumBatch}} in a {{while(true)}} loop. It can also
> occur in {{testActuallyParallel_mediumBatchImplicitlyParallel()}}. I also
> had to add
> {{forkJoinPool.shutdownNow();}} to the end of the test, because otherwise OOM
> errors occur.
> My current assumption is that there's no guarantee you ever actually end up
> running in parallel, so in extremely rare cases you just end up running one
> thread.
> I've had it vary wildly when I hit it, from within a second or two to running
> for over a minute before an assertion failure occurs.
> We could just alter the assertion to be
> {code}Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) &&
> threads.size() >= 1);{code}
> This defeats the purpose of testing the parallelism a bit, but if there's no
> guarantee we actually get parallelism there's not a fantastic way to test it.
> Given the extreme rarity, we might want to just live with the fact that
> occasionally {{threads.size() >= 1}} hits the threads.size() == 1 case on the
> two tests.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)