This is in m2e.    That said, it looks like the travis.yml file wasn’t merged 
from my “eclipse” branch so Travis wasn’t actually running agains the eclipse 
compiler.   That would have caught this.   JB and I will investigate how that 
got lost in the merge to master.

A "mvn -Peclipse-jdt clean install” in direct-java would show the same error.


Dan



> On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> 
> Hi Dan,
> 
> You mean directly building in Eclipse I guess using m2e ?
> 
> Regards
> JB
> 
> On 10/13/2016 03:59 PM, Daniel Kulp wrote:
>> 
>> Just an FYI:   this commit has caused things to not build in Eclipse, but 
>> I’m not exactly sure why.   The errors are in place where methods of the 
>> exact signature just moved into an internal class so I’m not yet sure why 
>> it’s causing an issue.
>> 
>> Description  Resource        Path    Location        Type
>> Bound mismatch: The type Read.Bounded<OutputT> is not a valid substitute for 
>> the bounded parameter <TransformT extends PTransform<? super 
>> InputT,OutputT>> of the type AppliedPTransform<InputT,OutputT,TransformT>  
>> BoundedReadEvaluatorFactory.java        
>> /beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 
>> 134        Java Problem
>> 
>> 
>> Dan
>> 
>> 
>> 
>> 
>> On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:
>>> Remove KeyedResourcePool
>>> 
>>> This interface is no longer used. Instead, the runner ensures that
>>> bundles will be provided containing the appropriate input to the
>>> TestStreamEvaluatorFactory.
>>> 
>>> 
>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
>>> Commit: 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
>>> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
>>> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
>>> 
>>> Branch: refs/heads/master
>>> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
>>> Parents: 7306e16
>>> Author: Thomas Groh <tg...@google.com>
>>> Authored: Wed Oct 5 13:12:48 2016 -0700
>>> Committer: Luke Cwik <lc...@google.com>
>>> Committed: Thu Oct 6 15:14:38 2016 -0700
>>> 
>>> ----------------------------------------------------------------------
>>> .../direct/BoundedReadEvaluatorFactory.java     |  40 +++--
>>> .../beam/runners/direct/DirectRunner.java       |   2 +
>>> .../beam/runners/direct/EmptyInputProvider.java |  49 ++++++
>>> .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
>>> .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
>>> .../beam/runners/direct/KeyedResourcePool.java  |  47 ------
>>> .../runners/direct/LockedKeyedResourcePool.java |  95 -----------
>>> .../beam/runners/direct/RootInputProvider.java  |  41 +++++
>>> .../runners/direct/RootProviderRegistry.java    |  65 ++++++++
>>> .../direct/RootTransformEvaluatorFactory.java   |  42 -----
>>> .../direct/TestStreamEvaluatorFactory.java      |  39 +++--
>>> .../direct/TransformEvaluatorRegistry.java      |  17 +-
>>> .../direct/UnboundedReadEvaluatorFactory.java   |  56 ++++---
>>> .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
>>> .../direct/FlattenEvaluatorFactoryTest.java     |   3 +-
>>> .../direct/LockedKeyedResourcePoolTest.java     | 163 -------------------
>>> .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
>>> .../UnboundedReadEvaluatorFactoryTest.java      |   8 +-
>>> 18 files changed, 269 insertions(+), 449 deletions(-)
>>> ----------------------------------------------------------------------
>>> 
>>> 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> ----------------------------------------------------------------------
>>> diff --git 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>  
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> index 4936ad9..326a535 100644
>>> --- 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> +++ 
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
>>>  * A {@link TransformEvaluatorFactory} that produces {@link 
>>> TransformEvaluator TransformEvaluators}
>>>  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
>>>  */
>>> -final class BoundedReadEvaluatorFactory implements 
>>> RootTransformEvaluatorFactory {
>>> +final class BoundedReadEvaluatorFactory implements 
>>> TransformEvaluatorFactory {
>>>   private final EvaluationContext evaluationContext;
>>> 
>>>   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
>>>     this.evaluationContext = evaluationContext;
>>>   }
>>> 
>>> -  @Override
>>> -  public Collection<CommittedBundle<?>> 
>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
>>> -    return createInitialSplits((AppliedPTransform) transform);
>>> -  }
>>> -
>>> -  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
>>> -      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
>>> -    BoundedSource<OutputT> source = transform.getTransform().getSource();
>>> -    return Collections.<CommittedBundle<?>>singleton(
>>> -        evaluationContext
>>> -            .<BoundedSourceShard<OutputT>>createRootBundle()
>>> -            
>>> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
>>> -            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
>>> -  }
>>> -
>>>   @SuppressWarnings({"unchecked", "rawtypes"})
>>>   @Override
>>>   @Nullable
>>> @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements 
>>> RootTransformEvaluatorFactory
>>> 
>>>     abstract BoundedSource<T> getSource();
>>>   }
>>> +
>>> +  static class InputProvider implements RootInputProvider {
>>> +    private final EvaluationContext evaluationContext;
>>> +
>>> +    InputProvider(EvaluationContext evaluationContext) {
>>> +      this.evaluationContext = evaluationContext;
>>> +    }
>>> +
>>> +    @Override
>>> +    public Collection<CommittedBundle<?>> 
>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
>>> +      return createInitialSplits((AppliedPTransform) transform);
>>> +    }
>>> +
>>> +    private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
>>> +        AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
>>> +      BoundedSource<OutputT> source = transform.getTransform().getSource();
>>> +      return Collections.<CommittedBundle<?>>singleton(
>>> +          evaluationContext
>>> +              .<BoundedSourceShard<OutputT>>createRootBundle()
>>> +              
>>> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
>>> +              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
>>> +    }
>>> +  }
>>> }
>>> 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>> ----------------------------------------------------------------------
>>> diff --git 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>>  
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>> index 2ec4f08..67ec3e6 100644
>>> --- 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>> +++ 
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
>>> @@ -248,12 +248,14 @@ public class DirectRunner
>>>     // independent executor service for each run
>>>     ExecutorService executorService = executorServiceSupplier.get();
>>> 
>>> +    RootInputProvider rootInputProvider = 
>>> RootProviderRegistry.defaultRegistry(context);
>>>     TransformEvaluatorRegistry registry = 
>>> TransformEvaluatorRegistry.defaultRegistry(context);
>>>     PipelineExecutor executor =
>>>         ExecutorServiceParallelExecutor.create(
>>>             executorService,
>>>             consumerTrackingVisitor.getValueToConsumers(),
>>>             keyedPValueVisitor.getKeyedPValues(),
>>> +            rootInputProvider,
>>>             registry,
>>>             defaultModelEnforcements(options),
>>>             context);
>>> 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>> ----------------------------------------------------------------------
>>> diff --git 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>>  
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>> new file mode 100644
>>> index 0000000..10d63e9
>>> --- /dev/null
>>> +++ 
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
>>> @@ -0,0 +1,49 @@
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +package org.apache.beam.runners.direct;
>>> +
>>> +import java.util.Collection;
>>> +import java.util.Collections;
>>> +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
>>> +import org.apache.beam.sdk.transforms.AppliedPTransform;
>>> +import org.apache.beam.sdk.transforms.PTransform;
>>> +import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
>>> +
>>> +/**
>>> + * A {@link RootInputProvider} that provides a singleton empty bundle.
>>> + */
>>> +class EmptyInputProvider implements RootInputProvider {
>>> +  private final EvaluationContext evaluationContext;
>>> +
>>> +  EmptyInputProvider(EvaluationContext evaluationContext) {
>>> +    this.evaluationContext = evaluationContext;
>>> +  }
>>> +
>>> +  /**
>>> +   * {@inheritDoc}.
>>> +   *
>>> +   * <p>Returns a single empty bundle. This bundle ensures that any {@link 
>>> PTransform PTransforms}
>>> +   * that consume from the output of the provided {@link 
>>> AppliedPTransform} have watermarks updated
>>> +   * as appropriate.
>>> +   */
>>> +  @Override
>>> +  public Collection<CommittedBundle<?>> 
>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
>>> +    return Collections.<CommittedBundle<?>>singleton(
>>> +        
>>> evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
>>> +  }
>>> +}
>>> 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>> ----------------------------------------------------------------------
>>> diff --git 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>>  
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>> index bb89699..52c45c3 100644
>>> --- 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>> +++ 
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
>>> @@ -17,6 +17,8 @@
>>>  */
>>> package org.apache.beam.runners.direct;
>>> 
>>> +import static com.google.common.base.Preconditions.checkState;
>>> +
>>> import com.google.auto.value.AutoValue;
>>> import com.google.common.base.MoreObjects;
>>> import com.google.common.base.Optional;
>>> @@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements 
>>> PipelineExecutor {
>>> 
>>>   private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
>>> valueToConsumers;
>>>   private final Set<PValue> keyedPValues;
>>> +  private final RootInputProvider rootInputProvider;
>>>   private final TransformEvaluatorRegistry registry;
>>>   @SuppressWarnings("rawtypes")
>>>   private final Map<Class<? extends PTransform>, 
>>> Collection<ModelEnforcementFactory>>
>>> @@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor 
>>> implements PipelineExecutor {
>>>       ExecutorService executorService,
>>>       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
>>>       Set<PValue> keyedPValues,
>>> +      RootInputProvider rootInputProvider,
>>>       TransformEvaluatorRegistry registry,
>>>       @SuppressWarnings("rawtypes")
>>> -      Map<Class<? extends PTransform>, 
>>> Collection<ModelEnforcementFactory>> transformEnforcements,
>>> +          Map<Class<? extends PTransform>, 
>>> Collection<ModelEnforcementFactory>>
>>> +              transformEnforcements,
>>>       EvaluationContext context) {
>>>     return new ExecutorServiceParallelExecutor(
>>> -        executorService, valueToConsumers, keyedPValues, registry, 
>>> transformEnforcements, context);
>>> +        executorService,
>>> +        valueToConsumers,
>>> +        keyedPValues,
>>> +        rootInputProvider,
>>> +        registry,
>>> +        transformEnforcements,
>>> +        context);
>>>   }
>>> 
>>>   private ExecutorServiceParallelExecutor(
>>>       ExecutorService executorService,
>>>       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
>>>       Set<PValue> keyedPValues,
>>> +      RootInputProvider rootInputProvider,
>>>       TransformEvaluatorRegistry registry,
>>>       @SuppressWarnings("rawtypes")
>>>       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
>>> transformEnforcements,
>>> @@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements 
>>> PipelineExecutor {
>>>     this.executorService = executorService;
>>>     this.valueToConsumers = valueToConsumers;
>>>     this.keyedPValues = keyedPValues;
>>> +    this.rootInputProvider = rootInputProvider;
>>>     this.registry = registry;
>>>     this.transformEnforcements = transformEnforcements;
>>>     this.evaluationContext = context;
>>> @@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements 
>>> PipelineExecutor {
>>>   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
>>>     for (AppliedPTransform<?, ?, ?> root : roots) {
>>>       ConcurrentLinkedQueue<CommittedBundle<?>> pending = new 
>>> ConcurrentLinkedQueue<>();
>>> -      pending.addAll(registry.getInitialInputs(root));
>>> +      Collection<CommittedBundle<?>> initialInputs = 
>>> rootInputProvider.getInitialInputs(root);
>>> +      checkState(
>>> +          !initialInputs.isEmpty(),
>>> +          "All root transforms must have initial inputs. Got 0 for %s",
>>> +          root.getFullName());
>>> +      pending.addAll(initialInputs);
>>>       pendingRootBundles.put(root, pending);
>>>     }
>>>     evaluationContext.initialize(pendingRootBundles);
>>> @@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements 
>>> PipelineExecutor {
>>>           LOG.debug("Executor Update: {}", update);
>>>           if (update.getBundle().isPresent()) {
>>>             if (ExecutorState.ACTIVE == startingState
>>> -                || (ExecutorState.PROCESSING == startingState && 
>>> noWorkOutstanding)) {
>>> +                || (ExecutorState.PROCESSING == startingState
>>> +                    && noWorkOutstanding)) {
>>>               scheduleConsumers(update);
>>>             } else {
>>>               allUpdates.offer(update);
>>> 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>> ----------------------------------------------------------------------
>>> diff --git 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>>  
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>> index 90db040..57d5628 100644
>>> --- 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>> +++ 
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
>>> @@ -17,15 +17,12 @@
>>>  */
>>> package org.apache.beam.runners.direct;
>>> 
>>> -import java.util.Collection;
>>> -import java.util.Collections;
>>> import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
>>> import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
>>> import org.apache.beam.sdk.transforms.AppliedPTransform;
>>> import org.apache.beam.sdk.transforms.Flatten;
>>> import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
>>> import org.apache.beam.sdk.transforms.PTransform;
>>> -import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
>>> import org.apache.beam.sdk.util.WindowedValue;
>>> import org.apache.beam.sdk.values.PCollection;
>>> import org.apache.beam.sdk.values.PCollectionList;
>>> @@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
>>>  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the 
>>> {@link Flatten}
>>>  * {@link PTransform}.
>>>  */
>>> -class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
>>> +class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
>>>   priva
>> [message truncated...]
>> 
>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

-- 
Daniel Kulp
dk...@apache.org - http://dankulp.com/blog
Talend Community Coder - http://coders.talend.com

Reply via email to