I just submitted a pull request that fixes the code as well as cherry-picks the 
yaml change from the last branch.  

Dan



> On Oct 13, 2016, at 10:48 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> 
> Indeed the .travis.yml has not been merged. I gonna fix that.
> 
> Sorry about that.
> 
> Regards
> JB
> 
> On 10/13/2016 04:37 PM, Daniel Kulp wrote:
>> 
>> This is in m2e.    That said, it looks like the travis.yml file wasn’t 
>> merged from my “eclipse” branch so Travis wasn’t actually running agains the 
>> eclipse compiler.   That would have caught this.   JB and I will investigate 
>> how that got lost in the merge to master.
>> 
>> A "mvn -Peclipse-jdt clean install” in direct-java would show the same error.
>> 
>> 
>> Dan
>> 
>> 
>> 
>>> On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré <j...@nanthrax.net> 
>>> wrote:
>>> 
>>> Hi Dan,
>>> 
>>> You mean directly building in Eclipse I guess using m2e ?
>>> 
>>> Regards
>>> JB
>>> 
>>> On 10/13/2016 03:59 PM, Daniel Kulp wrote:
>>>> 
>>>> Just an FYI:   this commit has caused things to not build in Eclipse, but 
>>>> I’m not exactly sure why.   The errors are in place where methods of the 
>>>> exact signature just moved into an internal class so I’m not yet sure why 
>>>> it’s causing an issue.
>>>> 
>>>> Description        Resource        Path    Location        Type
>>>> Bound mismatch: The type Read.Bounded<OutputT> is not a valid substitute 
>>>> for the bounded parameter <TransformT extends PTransform<? super 
>>>> InputT,OutputT>> of the type AppliedPTransform<InputT,OutputT,TransformT>  
>>>>       BoundedReadEvaluatorFactory.java        
>>>> /beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  
>>>> line 134        Java Problem
>>>> 
>>>> 
>>>> Dan
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:
>>>>> Remove KeyedResourcePool
>>>>> 
>>>>> This interface is no longer used. Instead, the runner ensures that
>>>>> bundles will be provided containing the appropriate input to the
>>>>> TestStreamEvaluatorFactory.
>>>>> 
>>>>> 
>>>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
>>>>> Commit: 
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
>>>>> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
>>>>> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
>>>>> 
>>>>> Branch: refs/heads/master
>>>>> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
>>>>> Parents: 7306e16
>>>>> Author: Thomas Groh <tg...@google.com>
>>>>> Authored: Wed Oct 5 13:12:48 2016 -0700
>>>>> Committer: Luke Cwik <lc...@google.com>
>>>>> Committed: Thu Oct 6 15:14:38 2016 -0700
>>>>> 
>>>>> ----------------------------------------------------------------------
>>>>> .../direct/BoundedReadEvaluatorFactory.java     |  40 +++--
>>>>> .../beam/runners/direct/DirectRunner.java       |   2 +
>>>>> .../beam/runners/direct/EmptyInputProvider.java |  49 ++++++
>>>>> .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
>>>>> .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
>>>>> .../beam/runners/direct/KeyedResourcePool.java  |  47 ------
>>>>> .../runners/direct/LockedKeyedResourcePool.java |  95 -----------
>>>>> .../beam/runners/direct/RootInputProvider.java  |  41 +++++
>>>>> .../runners/direct/RootProviderRegistry.java    |  65 ++++++++
>>>>> .../direct/RootTransformEvaluatorFactory.java   |  42 -----
>>>>> .../direct/TestStreamEvaluatorFactory.java      |  39 +++--
>>>>> .../direct/TransformEvaluatorRegistry.java      |  17 +-
>>>>> .../direct/UnboundedReadEvaluatorFactory.java   |  56 ++++---
>>>>> .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
>>>>> .../direct/FlattenEvaluatorFactoryTest.java     |   3 +-
>>>>> .../direct/LockedKeyedResourcePoolTest.java     | 163 -------------------
>>>>> .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
>>>>> .../UnboundedReadEvaluatorFactoryTest.java      |   8 +-
>>>>> 18 files changed, 269 insertions(+), 449 deletions(-)
>>>>> ----------------------------------------------------------------------
>>>>> 
>>>>> 
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>>>  
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>>> index 4936ad9..326a535 100644
>>>>> --- 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>>> +++ 
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>>> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
>>>>> * A {@link TransformEvaluatorFactory} that produces {@link 
>>>>> TransformEvaluator TransformEvaluators}
>>>>> * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
>>>>> */
>>>>> -final class BoundedReadEvaluatorFactory implements 
>>>>> RootTransformEvaluatorFactory {
>>>>> +final class BoundedReadEvaluatorFactory implements 
>>>>> TransformEvaluatorFactory {
>>>>>  private final EvaluationContext evaluationContext;
>>>>> 
>>>>>  BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
>>>>>    this.evaluationContext = evaluationContext;
>>>>>  }
>>>>> 
>>>>> -  @Override
>>>>> -  public Collection<CommittedBundle<?>> 
>>>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
>>>>> -    return createInitialSplits((AppliedPTransform) transform);
>>>>> -  }
>>>>> -
>>>>> -  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
>>>>> -      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
>>>>> -    BoundedSource<OutputT> source = transform.getTransform().getSource();
>>>>> -    return Collections.<CommittedBundle<?>>singleton(
>>>>> -        evaluationContext
>>>>> -            .<BoundedSourceShard<OutputT>>createRootBundle()
>>>>> -            
>>>>> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
>>>>> -            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
>>>>> -  }
>>>>> -
>>>>>  @SuppressWarnings({"unchecked", "rawtypes"})
>>>>>  @Override
>>>>>  @Nullable
>>>>> @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements 
>>>>> RootTransformEvaluatorFactory
>>>>> 
>>>>>    abstract BoundedSource<T> getSource();
>>>>>  }
>>>>> +
>>>>> +  static class InputProvider implements RootInputProvider {
>>>>> +    private final EvaluationContext evaluationContext;
>>>>> +
>>>>> +    InputProvider(EvaluationContext evaluationContext) {
>>>>> +      this.evaluationContext = evaluationContext;
>>>>> +    }
>>>>> +
>>>>> +    @Override
>>>>> +    public Collection<CommittedBundle<?>> 
>>>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
>>>>> +      return createInitialSplits((AppliedPTransform) transform);
>>>>> +    }
>>>>> +
>>>>> +    private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
>>>>> +        AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
>>>>> +      BoundedSource<OutputT> source = 
>>>>> transform.getTransform().getSource();
>>>>> +      return Collections.<CommittedBundle<?>>singleton(
>>>>> +          evaluationContext
>>>>> +              .<BoundedSourceShard<OutputT>>createRootBundle()
>>>>> +              
>>>>> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
>>>>> +              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
>>>>> +    }
>>>>> +  }
>>>>> }
>>>>> 
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>>>>  
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>>>> index 2ec4f08..67ec3e6 100644
>>>>> --- 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>>>> +++ 
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>>>> @@ -248,12 +248,14 @@ public class DirectRunner
>>>>>    // independent executor service for each run
>>>>>    ExecutorService executorService = executorServiceSupplier.get();
>>>>> 
>>>>> +    RootInputProvider rootInputProvider = 
>>>>> RootProviderRegistry.defaultRegistry(context);
>>>>>    TransformEvaluatorRegistry registry = 
>>>>> TransformEvaluatorRegistry.defaultRegistry(context);
>>>>>    PipelineExecutor executor =
>>>>>        ExecutorServiceParallelExecutor.create(
>>>>>            executorService,
>>>>>            consumerTrackingVisitor.getValueToConsumers(),
>>>>>            keyedPValueVisitor.getKeyedPValues(),
>>>>> +            rootInputProvider,
>>>>>            registry,
>>>>>            defaultModelEnforcements(options),
>>>>>            context);
>>>>> 
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>>>>  
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>>>> new file mode 100644
>>>>> index 0000000..10d63e9
>>>>> --- /dev/null
>>>>> +++ 
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>>>> @@ -0,0 +1,49 @@
>>>>> +/*
>>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>>> + * or more contributor license agreements.  See the NOTICE file
>>>>> + * distributed with this work for additional information
>>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>>> + * to you under the Apache License, Version 2.0 (the
>>>>> + * "License"); you may not use this file except in compliance
>>>>> + * with the License.  You may obtain a copy of the License at
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>>>>> implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +package org.apache.beam.runners.direct;
>>>>> +
>>>>> +import java.util.Collection;
>>>>> +import java.util.Collections;
>>>>> +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
>>>>> +import org.apache.beam.sdk.transforms.AppliedPTransform;
>>>>> +import org.apache.beam.sdk.transforms.PTransform;
>>>>> +import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
>>>>> +
>>>>> +/**
>>>>> + * A {@link RootInputProvider} that provides a singleton empty bundle.
>>>>> + */
>>>>> +class EmptyInputProvider implements RootInputProvider {
>>>>> +  private final EvaluationContext evaluationContext;
>>>>> +
>>>>> +  EmptyInputProvider(EvaluationContext evaluationContext) {
>>>>> +    this.evaluationContext = evaluationContext;
>>>>> +  }
>>>>> +
>>>>> +  /**
>>>>> +   * {@inheritDoc}.
>>>>> +   *
>>>>> +   * <p>Returns a single empty bundle. This bundle ensures that any 
>>>>> {@link PTransform PTransforms}
>>>>> +   * that consume from the output of the provided {@link 
>>>>> AppliedPTransform} have watermarks updated
>>>>> +   * as appropriate.
>>>>> +   */
>>>>> +  @Override
>>>>> +  public Collection<CommittedBundle<?>> 
>>>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
>>>>> +    return Collections.<CommittedBundle<?>>singleton(
>>>>> +        
>>>>> evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
>>>>> +  }
>>>>> +}
>>>>> 
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>>>>  
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>>>> index bb89699..52c45c3 100644
>>>>> --- 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>>>> +++ 
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>>>> @@ -17,6 +17,8 @@
>>>>> */
>>>>> package org.apache.beam.runners.direct;
>>>>> 
>>>>> +import static com.google.common.base.Preconditions.checkState;
>>>>> +
>>>>> import com.google.auto.value.AutoValue;
>>>>> import com.google.common.base.MoreObjects;
>>>>> import com.google.common.base.Optional;
>>>>> @@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements 
>>>>> PipelineExecutor {
>>>>> 
>>>>>  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
>>>>> valueToConsumers;
>>>>>  private final Set<PValue> keyedPValues;
>>>>> +  private final RootInputProvider rootInputProvider;
>>>>>  private final TransformEvaluatorRegistry registry;
>>>>>  @SuppressWarnings("rawtypes")
>>>>>  private final Map<Class<? extends PTransform>, 
>>>>> Collection<ModelEnforcementFactory>>
>>>>> @@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor 
>>>>> implements PipelineExecutor {
>>>>>      ExecutorService executorService,
>>>>>      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
>>>>>      Set<PValue> keyedPValues,
>>>>> +      RootInputProvider rootInputProvider,
>>>>>      TransformEvaluatorRegistry registry,
>>>>>      @SuppressWarnings("rawtypes")
>>>>> -      Map<Class<? extends PTransform>, 
>>>>> Collection<ModelEnforcementFactory>> transformEnforcements,
>>>>> +          Map<Class<? extends PTransform>, 
>>>>> Collection<ModelEnforcementFactory>>
>>>>> +              transformEnforcements,
>>>>>      EvaluationContext context) {
>>>>>    return new ExecutorServiceParallelExecutor(
>>>>> -        executorService, valueToConsumers, keyedPValues, registry, 
>>>>> transformEnforcements, context);
>>>>> +        executorService,
>>>>> +        valueToConsumers,
>>>>> +        keyedPValues,
>>>>> +        rootInputProvider,
>>>>> +        registry,
>>>>> +        transformEnforcements,
>>>>> +        context);
>>>>>  }
>>>>> 
>>>>>  private ExecutorServiceParallelExecutor(
>>>>>      ExecutorService executorService,
>>>>>      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
>>>>>      Set<PValue> keyedPValues,
>>>>> +      RootInputProvider rootInputProvider,
>>>>>      TransformEvaluatorRegistry registry,
>>>>>      @SuppressWarnings("rawtypes")
>>>>>      Map<Class<? extends PTransform>, 
>>>>> Collection<ModelEnforcementFactory>> transformEnforcements,
>>>>> @@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor 
>>>>> implements PipelineExecutor {
>>>>>    this.executorService = executorService;
>>>>>    this.valueToConsumers = valueToConsumers;
>>>>>    this.keyedPValues = keyedPValues;
>>>>> +    this.rootInputProvider = rootInputProvider;
>>>>>    this.registry = registry;
>>>>>    this.transformEnforcements = transformEnforcements;
>>>>>    this.evaluationContext = context;
>>>>> @@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor 
>>>>> implements PipelineExecutor {
>>>>>  public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
>>>>>    for (AppliedPTransform<?, ?, ?> root : roots) {
>>>>>      ConcurrentLinkedQueue<CommittedBundle<?>> pending = new 
>>>>> ConcurrentLinkedQueue<>();
>>>>> -      pending.addAll(registry.getInitialInputs(root));
>>>>> +      Collection<CommittedBundle<?>> initialInputs = 
>>>>> rootInputProvider.getInitialInputs(root);
>>>>> +      checkState(
>>>>> +          !initialInputs.isEmpty(),
>>>>> +          "All root transforms must have initial inputs. Got 0 for %s",
>>>>> +          root.getFullName());
>>>>> +      pending.addAll(initialInputs);
>>>>>      pendingRootBundles.put(root, pending);
>>>>>    }
>>>>>    evaluationContext.initialize(pendingRootBundles);
>>>>> @@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor 
>>>>> implements PipelineExecutor {
>>>>>          LOG.debug("Executor Update: {}", update);
>>>>>          if (update.getBundle().isPresent()) {
>>>>>            if (ExecutorState.ACTIVE == startingState
>>>>> -                || (ExecutorState.PROCESSING == startingState && 
>>>>> noWorkOutstanding)) {
>>>>> +                || (ExecutorState.PROCESSING == startingState
>>>>> +                    && noWorkOutstanding)) {
>>>>>              scheduleConsumers(update);
>>>>>            } else {
>>>>>              allUpdates.offer(update);
>>>>> 
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>>>>  
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>>>> index 90db040..57d5628 100644
>>>>> --- 
>>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>>>> +++ 
>>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>>>> @@ -17,15 +17,12 @@
>>>>> */
>>>>> package org.apache.beam.runners.direct;
>>>>> 
>>>>> -import java.util.Collection;
>>>>> -import java.util.Collections;
>>>>> import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
>>>>> import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
>>>>> import org.apache.beam.sdk.transforms.AppliedPTransform;
>>>>> import org.apache.beam.sdk.transforms.Flatten;
>>>>> import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
>>>>> import org.apache.beam.sdk.transforms.PTransform;
>>>>> -import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
>>>>> import org.apache.beam.sdk.util.WindowedValue;
>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>> import org.apache.beam.sdk.values.PCollectionList;
>>>>> @@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
>>>>> * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the 
>>>>> {@link Flatten}
>>>>> * {@link PTransform}.
>>>>> */
>>>>> -class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
>>>>> +class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
>>>>>  priva
>>>> [message truncated...]
>>>> 
>>>> 
>>> 
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

-- 
Daniel Kulp
dk...@apache.org - http://dankulp.com/blog
Talend Community Coder - http://coders.talend.com

Reply via email to