Repository: incubator-beam Updated Branches: refs/heads/master f0f4af581 -> fe17ef7f8
Fix FindBugs Errors in the Direct Runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa4e4ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa4e4ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa4e4ec Branch: refs/heads/master Commit: bfa4e4ece0091efc635c5c62c3eaf955f597fa39 Parents: f0f4af5 Author: Thomas Groh <tg...@google.com> Authored: Wed Nov 9 13:24:19 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Nov 11 16:49:09 2016 -0800 ---------------------------------------------------------------------- runners/direct-java/pom.xml | 13 ------- .../runners/direct/AggregatorContainer.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 41 ++++++++++++-------- .../direct/WatermarkCallbackExecutor.java | 4 +- .../beam/runners/direct/WatermarkManager.java | 4 +- 5 files changed, 32 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 43cf3c0..8983b1c 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -40,19 +40,6 @@ </resource> </resources> - <pluginManagement> - <plugins> - <!-- BEAM-924 --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </pluginManagement> - <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java index 7b6bc64..e86bc3e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java @@ -43,7 +43,7 @@ public class AggregatorContainer { private final String name; private final CombineFn<InputT, AccumT, OutputT> combiner; @GuardedBy("this") - private AccumT accumulator = null; + private volatile AccumT accumulator = null; private boolean committed = false; private AggregatorInfo( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index d1ffea1..30fc417 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; @@ -30,12 +32,12 @@ import java.util.Collections; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -142,7 +144,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader()); this.allUpdates = new ConcurrentLinkedQueue<>(); - this.visibleUpdates = new ArrayBlockingQueue<>(20); + this.visibleUpdates = new LinkedBlockingQueue<>(); parallelExecutorService = TransformExecutorServices.parallel(executorService); defaultCompletionCallback = @@ -180,7 +182,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { @SuppressWarnings("unchecked") public void scheduleConsumption( AppliedPTransform<?, ?, ?> consumer, - @Nullable CommittedBundle<?> bundle, + CommittedBundle<?> bundle, CompletionCallback onComplete) { evaluateBundle(consumer, bundle, onComplete); } @@ -399,19 +401,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { pendingUpdate = allUpdates.poll(); } for (ExecutorUpdate update : updates) { - LOG.debug("Executor Update: {}", update); - if (update.getBundle().isPresent()) { - if (ExecutorState.ACTIVE == startingState - || (ExecutorState.PROCESSING == startingState - && noWorkOutstanding)) { - scheduleConsumers(update); - } else { - allUpdates.offer(update); - } - } else if (update.getException().isPresent()) { - visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())); - exceptionThrown = true; - } + applyUpdate(noWorkOutstanding, startingState, update); } addWorkIfNecessary(); } catch (InterruptedException e) { @@ -434,6 +424,25 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } } + private void applyUpdate( + boolean noWorkOutstanding, ExecutorState startingState, ExecutorUpdate update) { + LOG.debug("Executor Update: {}", update); + if (update.getBundle().isPresent()) { + if (ExecutorState.ACTIVE == startingState + || (ExecutorState.PROCESSING == startingState + && noWorkOutstanding)) { + scheduleConsumers(update); + } else { + allUpdates.offer(update); + } + } else if (update.getException().isPresent()) { + checkState( + visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())), + "VisibleUpdates should always be able to receive an offered update"); + exceptionThrown = true; + } + } + /** * Fires any available timers. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java index c8bf912..54cab7c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Ordering; +import java.io.Serializable; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -129,7 +130,8 @@ class WatermarkCallbackExecutor { } } - private static class CallbackOrdering extends Ordering<WatermarkCallback> { + private static class CallbackOrdering extends Ordering<WatermarkCallback> + implements Serializable { @Override public int compare(WatermarkCallback left, WatermarkCallback right) { return ComparisonChain.start() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index fe2c2e5..a53c11c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -29,6 +29,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; import com.google.common.collect.TreeMultiset; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1433,7 +1434,8 @@ public class WatermarkManager { } } - private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> { + private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> + implements Serializable { @Override public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) { return ComparisonChain.start()