I just submitted a pull request that fixes the code as well as cherry-picks the yaml change from the last branch.
Dan > On Oct 13, 2016, at 10:48 AM, Jean-Baptiste Onofré <[email protected]> wrote: > > Indeed the .travis.yml has not been merged. I gonna fix that. > > Sorry about that. > > Regards > JB > > On 10/13/2016 04:37 PM, Daniel Kulp wrote: >> >> This is in m2e. That said, it looks like the travis.yml file wasn’t >> merged from my “eclipse” branch so Travis wasn’t actually running agains the >> eclipse compiler. That would have caught this. JB and I will investigate >> how that got lost in the merge to master. >> >> A "mvn -Peclipse-jdt clean install” in direct-java would show the same error. >> >> >> Dan >> >> >> >>> On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré <[email protected]> >>> wrote: >>> >>> Hi Dan, >>> >>> You mean directly building in Eclipse I guess using m2e ? >>> >>> Regards >>> JB >>> >>> On 10/13/2016 03:59 PM, Daniel Kulp wrote: >>>> >>>> Just an FYI: this commit has caused things to not build in Eclipse, but >>>> I’m not exactly sure why. The errors are in place where methods of the >>>> exact signature just moved into an internal class so I’m not yet sure why >>>> it’s causing an issue. >>>> >>>> Description Resource Path Location Type >>>> Bound mismatch: The type Read.Bounded<OutputT> is not a valid substitute >>>> for the bounded parameter <TransformT extends PTransform<? super >>>> InputT,OutputT>> of the type AppliedPTransform<InputT,OutputT,TransformT> >>>> BoundedReadEvaluatorFactory.java >>>> /beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct >>>> line 134 Java Problem >>>> >>>> >>>> Dan >>>> >>>> >>>> >>>> >>>> On 2016-10-06 18:31 (-0400), [email protected] wrote: >>>>> Remove KeyedResourcePool >>>>> >>>>> This interface is no longer used. Instead, the runner ensures that >>>>> bundles will be provided containing the appropriate input to the >>>>> TestStreamEvaluatorFactory. >>>>> >>>>> >>>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo >>>>> Commit: >>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0 >>>>> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0 >>>>> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0 >>>>> >>>>> Branch: refs/heads/master >>>>> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774 >>>>> Parents: 7306e16 >>>>> Author: Thomas Groh <[email protected]> >>>>> Authored: Wed Oct 5 13:12:48 2016 -0700 >>>>> Committer: Luke Cwik <[email protected]> >>>>> Committed: Thu Oct 6 15:14:38 2016 -0700 >>>>> >>>>> ---------------------------------------------------------------------- >>>>> .../direct/BoundedReadEvaluatorFactory.java | 40 +++-- >>>>> .../beam/runners/direct/DirectRunner.java | 2 + >>>>> .../beam/runners/direct/EmptyInputProvider.java | 49 ++++++ >>>>> .../direct/ExecutorServiceParallelExecutor.java | 27 ++- >>>>> .../runners/direct/FlattenEvaluatorFactory.java | 18 +- >>>>> .../beam/runners/direct/KeyedResourcePool.java | 47 ------ >>>>> .../runners/direct/LockedKeyedResourcePool.java | 95 ----------- >>>>> .../beam/runners/direct/RootInputProvider.java | 41 +++++ >>>>> .../runners/direct/RootProviderRegistry.java | 65 ++++++++ >>>>> .../direct/RootTransformEvaluatorFactory.java | 42 ----- >>>>> .../direct/TestStreamEvaluatorFactory.java | 39 +++-- >>>>> .../direct/TransformEvaluatorRegistry.java | 17 +- >>>>> .../direct/UnboundedReadEvaluatorFactory.java | 56 ++++--- >>>>> .../direct/BoundedReadEvaluatorFactoryTest.java | 3 +- >>>>> .../direct/FlattenEvaluatorFactoryTest.java | 3 +- >>>>> .../direct/LockedKeyedResourcePoolTest.java | 163 ------------------- >>>>> .../direct/TestStreamEvaluatorFactoryTest.java | 3 +- >>>>> .../UnboundedReadEvaluatorFactoryTest.java | 8 +- >>>>> 18 files changed, 269 insertions(+), 449 deletions(-) >>>>> ---------------------------------------------------------------------- >>>>> >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java >>>>> >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java >>>>> index 4936ad9..326a535 100644 >>>>> --- >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java >>>>> +++ >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java >>>>> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection; >>>>> * A {@link TransformEvaluatorFactory} that produces {@link >>>>> TransformEvaluator TransformEvaluators} >>>>> * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. >>>>> */ >>>>> -final class BoundedReadEvaluatorFactory implements >>>>> RootTransformEvaluatorFactory { >>>>> +final class BoundedReadEvaluatorFactory implements >>>>> TransformEvaluatorFactory { >>>>> private final EvaluationContext evaluationContext; >>>>> >>>>> BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) { >>>>> this.evaluationContext = evaluationContext; >>>>> } >>>>> >>>>> - @Override >>>>> - public Collection<CommittedBundle<?>> >>>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) { >>>>> - return createInitialSplits((AppliedPTransform) transform); >>>>> - } >>>>> - >>>>> - private <OutputT> Collection<CommittedBundle<?>> createInitialSplits( >>>>> - AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) { >>>>> - BoundedSource<OutputT> source = transform.getTransform().getSource(); >>>>> - return Collections.<CommittedBundle<?>>singleton( >>>>> - evaluationContext >>>>> - .<BoundedSourceShard<OutputT>>createRootBundle() >>>>> - >>>>> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source))) >>>>> - .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); >>>>> - } >>>>> - >>>>> @SuppressWarnings({"unchecked", "rawtypes"}) >>>>> @Override >>>>> @Nullable >>>>> @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements >>>>> RootTransformEvaluatorFactory >>>>> >>>>> abstract BoundedSource<T> getSource(); >>>>> } >>>>> + >>>>> + static class InputProvider implements RootInputProvider { >>>>> + private final EvaluationContext evaluationContext; >>>>> + >>>>> + InputProvider(EvaluationContext evaluationContext) { >>>>> + this.evaluationContext = evaluationContext; >>>>> + } >>>>> + >>>>> + @Override >>>>> + public Collection<CommittedBundle<?>> >>>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) { >>>>> + return createInitialSplits((AppliedPTransform) transform); >>>>> + } >>>>> + >>>>> + private <OutputT> Collection<CommittedBundle<?>> createInitialSplits( >>>>> + AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) { >>>>> + BoundedSource<OutputT> source = >>>>> transform.getTransform().getSource(); >>>>> + return Collections.<CommittedBundle<?>>singleton( >>>>> + evaluationContext >>>>> + .<BoundedSourceShard<OutputT>>createRootBundle() >>>>> + >>>>> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source))) >>>>> + .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); >>>>> + } >>>>> + } >>>>> } >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java >>>>> >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java >>>>> index 2ec4f08..67ec3e6 100644 >>>>> --- >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java >>>>> +++ >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java >>>>> @@ -248,12 +248,14 @@ public class DirectRunner >>>>> // independent executor service for each run >>>>> ExecutorService executorService = executorServiceSupplier.get(); >>>>> >>>>> + RootInputProvider rootInputProvider = >>>>> RootProviderRegistry.defaultRegistry(context); >>>>> TransformEvaluatorRegistry registry = >>>>> TransformEvaluatorRegistry.defaultRegistry(context); >>>>> PipelineExecutor executor = >>>>> ExecutorServiceParallelExecutor.create( >>>>> executorService, >>>>> consumerTrackingVisitor.getValueToConsumers(), >>>>> keyedPValueVisitor.getKeyedPValues(), >>>>> + rootInputProvider, >>>>> registry, >>>>> defaultModelEnforcements(options), >>>>> context); >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java >>>>> >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java >>>>> new file mode 100644 >>>>> index 0000000..10d63e9 >>>>> --- /dev/null >>>>> +++ >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java >>>>> @@ -0,0 +1,49 @@ >>>>> +/* >>>>> + * Licensed to the Apache Software Foundation (ASF) under one >>>>> + * or more contributor license agreements. See the NOTICE file >>>>> + * distributed with this work for additional information >>>>> + * regarding copyright ownership. The ASF licenses this file >>>>> + * to you under the Apache License, Version 2.0 (the >>>>> + * "License"); you may not use this file except in compliance >>>>> + * with the License. You may obtain a copy of the License at >>>>> + * >>>>> + * http://www.apache.org/licenses/LICENSE-2.0 >>>>> + * >>>>> + * Unless required by applicable law or agreed to in writing, software >>>>> + * distributed under the License is distributed on an "AS IS" BASIS, >>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>>>> implied. >>>>> + * See the License for the specific language governing permissions and >>>>> + * limitations under the License. >>>>> + */ >>>>> +package org.apache.beam.runners.direct; >>>>> + >>>>> +import java.util.Collection; >>>>> +import java.util.Collections; >>>>> +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; >>>>> +import org.apache.beam.sdk.transforms.AppliedPTransform; >>>>> +import org.apache.beam.sdk.transforms.PTransform; >>>>> +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; >>>>> + >>>>> +/** >>>>> + * A {@link RootInputProvider} that provides a singleton empty bundle. >>>>> + */ >>>>> +class EmptyInputProvider implements RootInputProvider { >>>>> + private final EvaluationContext evaluationContext; >>>>> + >>>>> + EmptyInputProvider(EvaluationContext evaluationContext) { >>>>> + this.evaluationContext = evaluationContext; >>>>> + } >>>>> + >>>>> + /** >>>>> + * {@inheritDoc}. >>>>> + * >>>>> + * <p>Returns a single empty bundle. This bundle ensures that any >>>>> {@link PTransform PTransforms} >>>>> + * that consume from the output of the provided {@link >>>>> AppliedPTransform} have watermarks updated >>>>> + * as appropriate. >>>>> + */ >>>>> + @Override >>>>> + public Collection<CommittedBundle<?>> >>>>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) { >>>>> + return Collections.<CommittedBundle<?>>singleton( >>>>> + >>>>> evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); >>>>> + } >>>>> +} >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java >>>>> >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java >>>>> index bb89699..52c45c3 100644 >>>>> --- >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java >>>>> +++ >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java >>>>> @@ -17,6 +17,8 @@ >>>>> */ >>>>> package org.apache.beam.runners.direct; >>>>> >>>>> +import static com.google.common.base.Preconditions.checkState; >>>>> + >>>>> import com.google.auto.value.AutoValue; >>>>> import com.google.common.base.MoreObjects; >>>>> import com.google.common.base.Optional; >>>>> @@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements >>>>> PipelineExecutor { >>>>> >>>>> private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> >>>>> valueToConsumers; >>>>> private final Set<PValue> keyedPValues; >>>>> + private final RootInputProvider rootInputProvider; >>>>> private final TransformEvaluatorRegistry registry; >>>>> @SuppressWarnings("rawtypes") >>>>> private final Map<Class<? extends PTransform>, >>>>> Collection<ModelEnforcementFactory>> >>>>> @@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor >>>>> implements PipelineExecutor { >>>>> ExecutorService executorService, >>>>> Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, >>>>> Set<PValue> keyedPValues, >>>>> + RootInputProvider rootInputProvider, >>>>> TransformEvaluatorRegistry registry, >>>>> @SuppressWarnings("rawtypes") >>>>> - Map<Class<? extends PTransform>, >>>>> Collection<ModelEnforcementFactory>> transformEnforcements, >>>>> + Map<Class<? extends PTransform>, >>>>> Collection<ModelEnforcementFactory>> >>>>> + transformEnforcements, >>>>> EvaluationContext context) { >>>>> return new ExecutorServiceParallelExecutor( >>>>> - executorService, valueToConsumers, keyedPValues, registry, >>>>> transformEnforcements, context); >>>>> + executorService, >>>>> + valueToConsumers, >>>>> + keyedPValues, >>>>> + rootInputProvider, >>>>> + registry, >>>>> + transformEnforcements, >>>>> + context); >>>>> } >>>>> >>>>> private ExecutorServiceParallelExecutor( >>>>> ExecutorService executorService, >>>>> Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, >>>>> Set<PValue> keyedPValues, >>>>> + RootInputProvider rootInputProvider, >>>>> TransformEvaluatorRegistry registry, >>>>> @SuppressWarnings("rawtypes") >>>>> Map<Class<? extends PTransform>, >>>>> Collection<ModelEnforcementFactory>> transformEnforcements, >>>>> @@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor >>>>> implements PipelineExecutor { >>>>> this.executorService = executorService; >>>>> this.valueToConsumers = valueToConsumers; >>>>> this.keyedPValues = keyedPValues; >>>>> + this.rootInputProvider = rootInputProvider; >>>>> this.registry = registry; >>>>> this.transformEnforcements = transformEnforcements; >>>>> this.evaluationContext = context; >>>>> @@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor >>>>> implements PipelineExecutor { >>>>> public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { >>>>> for (AppliedPTransform<?, ?, ?> root : roots) { >>>>> ConcurrentLinkedQueue<CommittedBundle<?>> pending = new >>>>> ConcurrentLinkedQueue<>(); >>>>> - pending.addAll(registry.getInitialInputs(root)); >>>>> + Collection<CommittedBundle<?>> initialInputs = >>>>> rootInputProvider.getInitialInputs(root); >>>>> + checkState( >>>>> + !initialInputs.isEmpty(), >>>>> + "All root transforms must have initial inputs. Got 0 for %s", >>>>> + root.getFullName()); >>>>> + pending.addAll(initialInputs); >>>>> pendingRootBundles.put(root, pending); >>>>> } >>>>> evaluationContext.initialize(pendingRootBundles); >>>>> @@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor >>>>> implements PipelineExecutor { >>>>> LOG.debug("Executor Update: {}", update); >>>>> if (update.getBundle().isPresent()) { >>>>> if (ExecutorState.ACTIVE == startingState >>>>> - || (ExecutorState.PROCESSING == startingState && >>>>> noWorkOutstanding)) { >>>>> + || (ExecutorState.PROCESSING == startingState >>>>> + && noWorkOutstanding)) { >>>>> scheduleConsumers(update); >>>>> } else { >>>>> allUpdates.offer(update); >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java >>>>> >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java >>>>> index 90db040..57d5628 100644 >>>>> --- >>>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java >>>>> +++ >>>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java >>>>> @@ -17,15 +17,12 @@ >>>>> */ >>>>> package org.apache.beam.runners.direct; >>>>> >>>>> -import java.util.Collection; >>>>> -import java.util.Collections; >>>>> import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; >>>>> import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; >>>>> import org.apache.beam.sdk.transforms.AppliedPTransform; >>>>> import org.apache.beam.sdk.transforms.Flatten; >>>>> import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; >>>>> import org.apache.beam.sdk.transforms.PTransform; >>>>> -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; >>>>> import org.apache.beam.sdk.util.WindowedValue; >>>>> import org.apache.beam.sdk.values.PCollection; >>>>> import org.apache.beam.sdk.values.PCollectionList; >>>>> @@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList; >>>>> * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the >>>>> {@link Flatten} >>>>> * {@link PTransform}. >>>>> */ >>>>> -class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory { >>>>> +class FlattenEvaluatorFactory implements TransformEvaluatorFactory { >>>>> priva >>>> [message truncated...] >>>> >>>> >>> >>> -- >>> Jean-Baptiste Onofré >>> [email protected] >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >> > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com -- Daniel Kulp [email protected] - http://dankulp.com/blog Talend Community Coder - http://coders.talend.com
