Tidy WriteWithShardingFactory
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20244bad Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20244bad Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20244bad Branch: refs/heads/master Commit: 20244badc3d1b8defd9e5b9a718f54475c502365 Parents: cf14644 Author: Kenneth Knowles <[email protected]> Authored: Tue Jul 19 19:16:13 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Jul 25 09:30:32 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/WriteWithShardingFactory.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20244bad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 93f2408..d6ee6ea 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -78,7 +78,8 @@ class WriteWithShardingFactory implements PTransformOverrideFactory { Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of()) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); - final PCollectionView<Long> numRecords = records.apply(Count.<T>globally().asSingletonView()); + final PCollectionView<Long> numRecords = records + .apply("CountRecords", Count.<T>globally().asSingletonView()); PCollection<T> resharded = records .apply( @@ -107,7 +108,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory { private final PCollectionView<Long> numRecords; private final int randomExtraShards; private int currentShard; - private int maxShards; + private int maxShards = 0; KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) { this.numRecords = numRecords; @@ -116,7 +117,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory { @Override public void processElement(ProcessContext c) throws Exception { - if (maxShards == 0L) { + if (maxShards == 0) { maxShards = calculateShards(c.sideInput(numRecords)); currentShard = ThreadLocalRandom.current().nextInt(maxShards); }
