http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 68184de..b2d61c3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -17,6 +17,17 @@ */ package org.apache.beam.runners.direct; +import com.google.common.base.MoreObjects; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; @@ -46,23 +57,9 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - import org.joda.time.Duration; import org.joda.time.Instant; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded * {@link PCollection PCollections}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index a4705dd..4003983 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -17,15 +17,13 @@ */ package org.apache.beam.runners.direct; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.util.TimerInternals; - import org.joda.time.Instant; -import javax.annotation.Nullable; - /** * An implementation of {@link TimerInternals} where all relevant data exists in memory. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java index 3f4f2c6..0e15c18 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java @@ -18,21 +18,18 @@ package org.apache.beam.runners.direct; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.SerializableUtils; - import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Manages {@link DoFn} setup, teardown, and serialization. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java index 523273c..faa0615 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import org.apache.beam.sdk.util.WindowedValue; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index b9f159a..5af25bc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -19,6 +19,17 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -42,23 +53,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.MoreExecutors; - import org.joda.time.Instant; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - /** * The evaluation context for a specific pipeline being executed by the * {@link DirectRunner}. Contains state shared within the execution across all http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 35b6239..401ed7f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -17,20 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; - import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; @@ -39,10 +25,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -56,8 +38,22 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - import javax.annotation.Nullable; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An {@link PipelineExecutor} that uses an underlying {@link ExecutorService} and http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index d16ffa0..2da70bb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; @@ -40,10 +42,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import com.google.common.collect.ImmutableMap; - -import java.util.Collections; - /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link GroupByKeyOnly} {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index dbdbdaf..f085a39 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -17,9 +17,14 @@ */ package org.apache.beam.runners.direct; -import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -37,12 +42,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link GroupByKeyOnly} {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index 8be12fd..d5c0f0c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; @@ -29,10 +31,6 @@ import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; - import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java index b0eb38f..1602f68 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import java.util.IdentityHashMap; +import java.util.Map; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -27,9 +29,6 @@ import org.apache.beam.sdk.util.MutationDetectors; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; -import java.util.IdentityHashMap; -import java.util.Map; - /** * {@link ModelEnforcement} that enforces elements are not modified over the course of processing * an element. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index 25a0d05..e79da7b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -19,15 +19,13 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; - import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 2fea00a..7c4376a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -19,15 +19,14 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import java.util.HashSet; +import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; -import java.util.HashSet; -import java.util.Set; - /** * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java index 77fa196..5a2b18d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java @@ -17,9 +17,8 @@ */ package org.apache.beam.runners.direct; -import org.joda.time.Instant; - import java.util.concurrent.TimeUnit; +import org.joda.time.Instant; /** * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 6ef0ffe..85a1c6a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.direct; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -34,14 +40,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import com.google.common.collect.ImmutableList; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - class ParDoEvaluator<T> implements TransformEvaluator<T> { public static <InputT, OutputT> ParDoEvaluator<InputT> create( EvaluationContext evaluationContext, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java index 2d05e68..6a41adf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.direct; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.util.Map; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -26,16 +30,9 @@ import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link BoundMulti} primitive {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index 97cbfa7..4bb740b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -17,6 +17,11 @@ */ package org.apache.beam.runners.direct; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -25,17 +30,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; - /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link Bound ParDo.Bound} primitive {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java index 76df11c..01a5c54 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java @@ -17,12 +17,11 @@ */ package org.apache.beam.runners.direct; +import java.util.Collection; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import java.util.Collection; - /** * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both * source and intermediate {@link PTransform PTransforms}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java index 6458215..cd459e4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java @@ -19,15 +19,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.PCollectionViewWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; - import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; @@ -36,7 +27,6 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -44,8 +34,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; - import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PCollectionViewWindow; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; /** * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java index 18fe04f..e18b2ac 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java @@ -17,11 +17,9 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.sdk.transforms.AppliedPTransform; - import com.google.common.base.MoreObjects; - import java.util.Objects; +import org.apache.beam.sdk.transforms.AppliedPTransform; /** * A (Step, Key) pair. This is useful as a map key or cache key for things that are available http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 12b18cb..1829e4a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.direct; +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -24,18 +30,8 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; - import org.joda.time.Instant; -import java.util.Collection; -import java.util.EnumSet; -import java.util.Set; - -import javax.annotation.Nullable; - /** * An immutable {@link TransformResult}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 90a83b0..e9f37ba 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -20,6 +20,11 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Supplier; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.Pipeline; @@ -41,18 +46,9 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.TimestampedValue; - -import com.google.common.base.Supplier; - import org.joda.time.Duration; import org.joda.time.Instant; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.Nullable; - /** * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index 3655d26..e9fa06b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -24,8 +25,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; -import javax.annotation.Nullable; - /** * A factory for creating instances of {@link TransformEvaluator} for the application of a * {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index c35e8b1..9edc50f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -19,6 +19,12 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -29,19 +35,9 @@ import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; - -import com.google.common.collect.ImmutableMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.Nullable; - /** * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} * implementations based on the type of {@link PTransform} of the application. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index d873bf5..cc6b5b7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -19,16 +19,14 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.util.WindowedValue; - import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; - import javax.annotation.Nullable; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; /** * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java index ea15f03..876da9d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; import com.google.common.base.MoreObjects; - import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index c01fa56..0b08294 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -25,13 +27,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; - import org.joda.time.Instant; -import java.util.Set; - -import javax.annotation.Nullable; - /** * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java index 0246236..2371d3b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java @@ -17,18 +17,15 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.io.Read.Unbounded; -import org.apache.beam.sdk.transforms.PTransform; - import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; - -import org.joda.time.Duration; - import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.transforms.PTransform; +import org.joda.time.Duration; /** * Provides methods to determine if a record is a duplicate within the evaluation of a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index c4d408b..9f485e0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.direct; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.io.Read.Unbounded; @@ -29,18 +35,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.annotations.VisibleForTesting; - import org.joda.time.Instant; -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - /** * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java index 570dc90..41f7e8d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java @@ -17,14 +17,13 @@ */ package org.apache.beam.runners.direct; +import java.util.Map; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; -import java.util.Map; - /** * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the * {@link DirectRunner}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 3b0de4b..40ac7f0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; @@ -34,9 +36,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; -import java.util.ArrayList; -import java.util.List; - /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link CreatePCollectionView} primitive {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java index 0f73b1d..7961f24 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -17,19 +17,16 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowingStrategy; - import com.google.common.collect.ComparisonChain; import com.google.common.collect.Ordering; - -import org.joda.time.Instant; - import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.joda.time.Instant; /** * Executes callbacks that occur based on the progression of the watermark per-step. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index a44fa50..ff7428d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -19,18 +19,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; - import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -43,9 +31,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; import com.google.common.collect.TreeMultiset; - -import org.joda.time.Instant; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,8 +46,19 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; - import javax.annotation.Nullable; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.joda.time.Instant; /** * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index f2e62cb..19c1a98 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.direct; +import com.google.common.collect.Iterables; +import java.util.Collection; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -28,15 +31,8 @@ import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.Iterables; - import org.joda.time.Instant; -import java.util.Collection; - -import javax.annotation.Nullable; - /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the * {@link Bound Window.Bound} primitive {@link PTransform}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 1ab3403..d74cd56 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.io.Write.Bound; import org.apache.beam.sdk.transforms.Count; @@ -39,13 +41,8 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; - -import com.google.common.annotations.VisibleForTesting; - import org.joda.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - /** * A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms} * with an unspecified number of shards with a write with a specified number of shards. The number http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java index b131b4c..c8310c9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java @@ -21,10 +21,12 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; import org.apache.beam.sdk.util.ExecutionContext.StepContext; - import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -34,10 +36,6 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - /** * Tests for {@link AggregatorContainer}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 9bc4f7b..cbeb733 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -25,6 +25,11 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -40,9 +45,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableList; - import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -51,11 +53,6 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; - /** * Tests for {@link BoundedReadEvaluatorFactory}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index a8c647e..efc6d2f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -21,6 +21,11 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -31,20 +36,12 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; - -import com.google.common.collect.ImmutableList; - import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.Serializable; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; - /** * Tests for {@link CommittedResult}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java index 529316c..1c9b5a6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.emptyIterable; import static org.junit.Assert.assertThat; +import java.io.Serializable; +import java.util.List; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -35,7 +37,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; - import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -43,9 +44,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.Serializable; -import java.util.List; - /** * Tests for {@link ConsumerTrackingPipelineVisitor}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java index d5cdee6..603e43e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java @@ -20,20 +20,17 @@ package org.apache.beam.runners.direct; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.ServiceLoader; import org.apache.beam.runners.direct.DirectRegistrar.Options; import org.apache.beam.runners.direct.DirectRegistrar.Runner; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.ServiceLoader; - /** Tests for {@link DirectRegistrar}. */ @RunWith(JUnit4.class) public class DirectRegistrarTest { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 9739adb..c7efac3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -22,6 +22,13 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderException; @@ -47,8 +54,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableMap; -import com.fasterxml.jackson.annotation.JsonValue; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -56,11 +61,6 @@ import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * Tests for basic {@link DirectRunner} functionality. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index c038910..51cfeed 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.state.StateNamespaces; - import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index 67f4ff4..2e4fee2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -25,18 +25,16 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowedValue; - import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.ArrayList; -import java.util.List; - /** * Tests for {@link DoFnLifecycleManagerRemovingTransformEvaluator}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java index 9da46f2..1f0af99 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java @@ -25,13 +25,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.theInstance; import static org.junit.Assert.assertThat; -import org.apache.beam.sdk.transforms.OldDoFn; - -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -40,6 +33,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** * Tests for {@link DoFnLifecycleManager}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java index 8be3d52..39a4a9d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java @@ -20,10 +20,10 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.equalTo; -import org.apache.beam.sdk.transforms.OldDoFn; - import com.google.common.collect.ImmutableList; - +import java.util.ArrayList; +import java.util.Collection; +import org.apache.beam.sdk.transforms.OldDoFn; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -34,9 +34,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.ArrayList; -import java.util.Collection; - /** * Tests for {@link DoFnLifecycleManagers}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java index b903ef1..e0ccbe5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java @@ -19,6 +19,10 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.isA; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; @@ -31,7 +35,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -39,11 +42,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; - /** * Tests for {@link EncodabilityEnforcementFactory}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 7ac0caa..f59dbba 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -25,6 +25,13 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; @@ -61,10 +68,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; @@ -72,12 +75,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * Tests for {@link EvaluationContext}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 0bc3036..1c46c24 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; - import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java index 9ea71d7..6abaf92 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; - import org.junit.Before; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 78736c4..8d1f8bd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -22,6 +22,9 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multiset; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -36,11 +39,6 @@ import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multiset; - import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 4afd64b..9f1e916 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -22,6 +22,9 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multiset; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -36,11 +39,6 @@ import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multiset; - import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index db934e5..d44151a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index e1be120..713ae35 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import java.io.Serializable; +import java.util.Collections; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -27,7 +29,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; - import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; @@ -36,9 +37,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.Serializable; -import java.util.Collections; - /** * Tests for {@link ImmutabilityEnforcementFactory}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java index 21e4bcb..43108f8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java @@ -21,6 +21,10 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -36,9 +40,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableList; - import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -49,10 +50,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; - /** * Tests for {@link ImmutableListBundleFactory}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index 9e273ad..ee6b2b4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -21,6 +21,9 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -35,9 +38,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableSet; - import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -45,9 +45,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.Collections; -import java.util.Set; - /** * Tests for {@link KeyedPValueTrackingVisitor}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 3208841..2a54ecb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -23,6 +23,13 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -45,9 +52,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; @@ -57,10 +61,6 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import javax.annotation.Nullable; /** * Tests for {@link ParDoEvaluator}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index 19094cb..5552196 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.Serializable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -59,7 +60,6 @@ import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.Serializable; /** * Tests for {@link ParDoMultiEvaluatorFactory}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index a4fd570..60b6dd9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.Serializable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -56,7 +57,6 @@ import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.Serializable; /** * Tests for {@link ParDoSingleEvaluatorFactory}.