Just an FYI:   this commit has caused things to not build in Eclipse, but I’m 
not exactly sure why.   The errors are in place where methods of the exact 
signature just moved into an internal class so I’m not yet sure why it’s 
causing an issue.

Description     Resource        Path    Location        Type
Bound mismatch: The type Read.Bounded<OutputT> is not a valid substitute for 
the bounded parameter <TransformT extends PTransform<? super InputT,OutputT>> 
of the type AppliedPTransform<InputT,OutputT,TransformT>     
BoundedReadEvaluatorFactory.java        
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 
134        Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote: 
> Remove KeyedResourcePool
> 
> This interface is no longer used. Instead, the runner ensures that
> bundles will be provided containing the appropriate input to the
> TestStreamEvaluatorFactory.
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
> 
> Branch: refs/heads/master
> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
> Parents: 7306e16
> Author: Thomas Groh <tg...@google.com>
> Authored: Wed Oct 5 13:12:48 2016 -0700
> Committer: Luke Cwik <lc...@google.com>
> Committed: Thu Oct 6 15:14:38 2016 -0700
> 
> ----------------------------------------------------------------------
>  .../direct/BoundedReadEvaluatorFactory.java     |  40 +++--
>  .../beam/runners/direct/DirectRunner.java       |   2 +
>  .../beam/runners/direct/EmptyInputProvider.java |  49 ++++++
>  .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
>  .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
>  .../beam/runners/direct/KeyedResourcePool.java  |  47 ------
>  .../runners/direct/LockedKeyedResourcePool.java |  95 -----------
>  .../beam/runners/direct/RootInputProvider.java  |  41 +++++
>  .../runners/direct/RootProviderRegistry.java    |  65 ++++++++
>  .../direct/RootTransformEvaluatorFactory.java   |  42 -----
>  .../direct/TestStreamEvaluatorFactory.java      |  39 +++--
>  .../direct/TransformEvaluatorRegistry.java      |  17 +-
>  .../direct/UnboundedReadEvaluatorFactory.java   |  56 ++++---
>  .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
>  .../direct/FlattenEvaluatorFactoryTest.java     |   3 +-
>  .../direct/LockedKeyedResourcePoolTest.java     | 163 -------------------
>  .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
>  .../UnboundedReadEvaluatorFactoryTest.java      |   8 +-
>  18 files changed, 269 insertions(+), 449 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> ----------------------------------------------------------------------
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> index 4936ad9..326a535 100644
> --- 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
>   * A {@link TransformEvaluatorFactory} that produces {@link 
> TransformEvaluator TransformEvaluators}
>   * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
>   */
> -final class BoundedReadEvaluatorFactory implements 
> RootTransformEvaluatorFactory {
> +final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory 
> {
>    private final EvaluationContext evaluationContext;
>  
>    BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
>      this.evaluationContext = evaluationContext;
>    }
>  
> -  @Override
> -  public Collection<CommittedBundle<?>> 
> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
> -    return createInitialSplits((AppliedPTransform) transform);
> -  }
> -
> -  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
> -      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
> -    BoundedSource<OutputT> source = transform.getTransform().getSource();
> -    return Collections.<CommittedBundle<?>>singleton(
> -        evaluationContext
> -            .<BoundedSourceShard<OutputT>>createRootBundle()
> -            
> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
> -            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
> -  }
> -
>    @SuppressWarnings({"unchecked", "rawtypes"})
>    @Override
>    @Nullable
> @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements 
> RootTransformEvaluatorFactory
>  
>      abstract BoundedSource<T> getSource();
>    }
> +
> +  static class InputProvider implements RootInputProvider {
> +    private final EvaluationContext evaluationContext;
> +
> +    InputProvider(EvaluationContext evaluationContext) {
> +      this.evaluationContext = evaluationContext;
> +    }
> +
> +    @Override
> +    public Collection<CommittedBundle<?>> 
> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
> +      return createInitialSplits((AppliedPTransform) transform);
> +    }
> +
> +    private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
> +        AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
> +      BoundedSource<OutputT> source = transform.getTransform().getSource();
> +      return Collections.<CommittedBundle<?>>singleton(
> +          evaluationContext
> +              .<BoundedSourceShard<OutputT>>createRootBundle()
> +              
> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
> +              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
> +    }
> +  }
>  }
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
> ----------------------------------------------------------------------
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
> index 2ec4f08..67ec3e6 100644
> --- 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
> @@ -248,12 +248,14 @@ public class DirectRunner
>      // independent executor service for each run
>      ExecutorService executorService = executorServiceSupplier.get();
>  
> +    RootInputProvider rootInputProvider = 
> RootProviderRegistry.defaultRegistry(context);
>      TransformEvaluatorRegistry registry = 
> TransformEvaluatorRegistry.defaultRegistry(context);
>      PipelineExecutor executor =
>          ExecutorServiceParallelExecutor.create(
>              executorService,
>              consumerTrackingVisitor.getValueToConsumers(),
>              keyedPValueVisitor.getKeyedPValues(),
> +            rootInputProvider,
>              registry,
>              defaultModelEnforcements(options),
>              context);
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
> ----------------------------------------------------------------------
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
> new file mode 100644
> index 0000000..10d63e9
> --- /dev/null
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
> @@ -0,0 +1,49 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.beam.runners.direct;
> +
> +import java.util.Collection;
> +import java.util.Collections;
> +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
> +import org.apache.beam.sdk.transforms.AppliedPTransform;
> +import org.apache.beam.sdk.transforms.PTransform;
> +import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> +
> +/**
> + * A {@link RootInputProvider} that provides a singleton empty bundle.
> + */
> +class EmptyInputProvider implements RootInputProvider {
> +  private final EvaluationContext evaluationContext;
> +
> +  EmptyInputProvider(EvaluationContext evaluationContext) {
> +    this.evaluationContext = evaluationContext;
> +  }
> +
> +  /**
> +   * {@inheritDoc}.
> +   *
> +   * <p>Returns a single empty bundle. This bundle ensures that any {@link 
> PTransform PTransforms}
> +   * that consume from the output of the provided {@link AppliedPTransform} 
> have watermarks updated
> +   * as appropriate.
> +   */
> +  @Override
> +  public Collection<CommittedBundle<?>> 
> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
> +    return Collections.<CommittedBundle<?>>singleton(
> +        
> evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
> ----------------------------------------------------------------------
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
> index bb89699..52c45c3 100644
> --- 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
> @@ -17,6 +17,8 @@
>   */
>  package org.apache.beam.runners.direct;
>  
> +import static com.google.common.base.Preconditions.checkState;
> +
>  import com.google.auto.value.AutoValue;
>  import com.google.common.base.MoreObjects;
>  import com.google.common.base.Optional;
> @@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements 
> PipelineExecutor {
>  
>    private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
> valueToConsumers;
>    private final Set<PValue> keyedPValues;
> +  private final RootInputProvider rootInputProvider;
>    private final TransformEvaluatorRegistry registry;
>    @SuppressWarnings("rawtypes")
>    private final Map<Class<? extends PTransform>, 
> Collection<ModelEnforcementFactory>>
> @@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor implements 
> PipelineExecutor {
>        ExecutorService executorService,
>        Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
>        Set<PValue> keyedPValues,
> +      RootInputProvider rootInputProvider,
>        TransformEvaluatorRegistry registry,
>        @SuppressWarnings("rawtypes")
> -      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
> transformEnforcements,
> +          Map<Class<? extends PTransform>, 
> Collection<ModelEnforcementFactory>>
> +              transformEnforcements,
>        EvaluationContext context) {
>      return new ExecutorServiceParallelExecutor(
> -        executorService, valueToConsumers, keyedPValues, registry, 
> transformEnforcements, context);
> +        executorService,
> +        valueToConsumers,
> +        keyedPValues,
> +        rootInputProvider,
> +        registry,
> +        transformEnforcements,
> +        context);
>    }
>  
>    private ExecutorServiceParallelExecutor(
>        ExecutorService executorService,
>        Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
>        Set<PValue> keyedPValues,
> +      RootInputProvider rootInputProvider,
>        TransformEvaluatorRegistry registry,
>        @SuppressWarnings("rawtypes")
>        Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
> transformEnforcements,
> @@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements 
> PipelineExecutor {
>      this.executorService = executorService;
>      this.valueToConsumers = valueToConsumers;
>      this.keyedPValues = keyedPValues;
> +    this.rootInputProvider = rootInputProvider;
>      this.registry = registry;
>      this.transformEnforcements = transformEnforcements;
>      this.evaluationContext = context;
> @@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements 
> PipelineExecutor {
>    public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
>      for (AppliedPTransform<?, ?, ?> root : roots) {
>        ConcurrentLinkedQueue<CommittedBundle<?>> pending = new 
> ConcurrentLinkedQueue<>();
> -      pending.addAll(registry.getInitialInputs(root));
> +      Collection<CommittedBundle<?>> initialInputs = 
> rootInputProvider.getInitialInputs(root);
> +      checkState(
> +          !initialInputs.isEmpty(),
> +          "All root transforms must have initial inputs. Got 0 for %s",
> +          root.getFullName());
> +      pending.addAll(initialInputs);
>        pendingRootBundles.put(root, pending);
>      }
>      evaluationContext.initialize(pendingRootBundles);
> @@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements 
> PipelineExecutor {
>            LOG.debug("Executor Update: {}", update);
>            if (update.getBundle().isPresent()) {
>              if (ExecutorState.ACTIVE == startingState
> -                || (ExecutorState.PROCESSING == startingState && 
> noWorkOutstanding)) {
> +                || (ExecutorState.PROCESSING == startingState
> +                    && noWorkOutstanding)) {
>                scheduleConsumers(update);
>              } else {
>                allUpdates.offer(update);
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
> ----------------------------------------------------------------------
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
> index 90db040..57d5628 100644
> --- 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
> @@ -17,15 +17,12 @@
>   */
>  package org.apache.beam.runners.direct;
>  
> -import java.util.Collection;
> -import java.util.Collections;
>  import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
>  import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
>  import org.apache.beam.sdk.transforms.AppliedPTransform;
>  import org.apache.beam.sdk.transforms.Flatten;
>  import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
>  import org.apache.beam.sdk.transforms.PTransform;
> -import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
>  import org.apache.beam.sdk.util.WindowedValue;
>  import org.apache.beam.sdk.values.PCollection;
>  import org.apache.beam.sdk.values.PCollectionList;
> @@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
>   * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link 
> Flatten}
>   * {@link PTransform}.
>   */
> -class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
> +class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
>    priva
[message truncated...] 


-- 
Daniel Kulp
dk...@apache.org - http://dankulp.com/blog
Talend Community Coder - http://coders.talend.com

Reply via email to