Hi Dan,

You mean directly building in Eclipse I guess using m2e ?

Regards
JB

On 10/13/2016 03:59 PM, Daniel Kulp wrote:

Just an FYI:   this commit has caused things to not build in Eclipse, but I’m 
not exactly sure why.   The errors are in place where methods of the exact 
signature just moved into an internal class so I’m not yet sure why it’s 
causing an issue.

Description     Resource        Path    Location        Type
Bound mismatch: The type Read.Bounded<OutputT> is not a valid substitute for the bounded 
parameter <TransformT extends PTransform<? super InputT,OutputT>> of the type 
AppliedPTransform<InputT,OutputT,TransformT>     BoundedReadEvaluatorFactory.java        
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 134        Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:
Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:14:38 2016 -0700

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     |  40 +++--
 .../beam/runners/direct/DirectRunner.java       |   2 +
 .../beam/runners/direct/EmptyInputProvider.java |  49 ++++++
 .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
 .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
 .../beam/runners/direct/KeyedResourcePool.java  |  47 ------
 .../runners/direct/LockedKeyedResourcePool.java |  95 -----------
 .../beam/runners/direct/RootInputProvider.java  |  41 +++++
 .../runners/direct/RootProviderRegistry.java    |  65 ++++++++
 .../direct/RootTransformEvaluatorFactory.java   |  42 -----
 .../direct/TestStreamEvaluatorFactory.java      |  39 +++--
 .../direct/TransformEvaluatorRegistry.java      |  17 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |  56 ++++---
 .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
 .../direct/FlattenEvaluatorFactoryTest.java     |   3 +-
 .../direct/LockedKeyedResourcePoolTest.java     | 163 -------------------
 .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |   8 +-
 18 files changed, 269 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
  */
-final class BoundedReadEvaluatorFactory implements 
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;

   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
   }

-  @Override
-  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, 
?> transform) {
-    return createInitialSplits((AppliedPTransform) transform);
-  }
-
-  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
-      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
-    BoundedSource<OutputT> source = transform.getTransform().getSource();
-    return Collections.<CommittedBundle<?>>singleton(
-        evaluationContext
-            .<BoundedSourceShard<OutputT>>createRootBundle()
-            
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
-            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
@@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements 
RootTransformEvaluatorFactory

     abstract BoundedSource<T> getSource();
   }
+
+  static class InputProvider implements RootInputProvider {
+    private final EvaluationContext evaluationContext;
+
+    InputProvider(EvaluationContext evaluationContext) {
+      this.evaluationContext = evaluationContext;
+    }
+
+    @Override
+    public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, 
?, ?> transform) {
+      return createInitialSplits((AppliedPTransform) transform);
+    }
+
+    private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+        AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+      BoundedSource<OutputT> source = transform.getTransform().getSource();
+      return Collections.<CommittedBundle<?>>singleton(
+          evaluationContext
+              .<BoundedSourceShard<OutputT>>createRootBundle()
+              
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
+              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2ec4f08..67ec3e6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -248,12 +248,14 @@ public class DirectRunner
     // independent executor service for each run
     ExecutorService executorService = executorServiceSupplier.get();

+    RootInputProvider rootInputProvider = 
RootProviderRegistry.defaultRegistry(context);
     TransformEvaluatorRegistry registry = 
TransformEvaluatorRegistry.defaultRegistry(context);
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
             executorService,
             consumerTrackingVisitor.getValueToConsumers(),
             keyedPValueVisitor.getKeyedPValues(),
+            rootInputProvider,
             registry,
             defaultModelEnforcements(options),
             context);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
new file mode 100644
index 0000000..10d63e9
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A {@link RootInputProvider} that provides a singleton empty bundle.
+ */
+class EmptyInputProvider implements RootInputProvider {
+  private final EvaluationContext evaluationContext;
+
+  EmptyInputProvider(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>Returns a single empty bundle. This bundle ensures that any {@link 
PTransform PTransforms}
+   * that consume from the output of the provided {@link AppliedPTransform} 
have watermarks updated
+   * as appropriate.
+   */
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, 
?> transform) {
+    return Collections.<CommittedBundle<?>>singleton(
+        
evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index bb89699..52c45c3 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;

+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
@@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {

   private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
valueToConsumers;
   private final Set<PValue> keyedPValues;
+  private final RootInputProvider rootInputProvider;
   private final TransformEvaluatorRegistry registry;
   @SuppressWarnings("rawtypes")
   private final Map<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
@@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       ExecutorService executorService,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Set<PValue> keyedPValues,
+      RootInputProvider rootInputProvider,
       TransformEvaluatorRegistry registry,
       @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
+          Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+              transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
-        executorService, valueToConsumers, keyedPValues, registry, 
transformEnforcements, context);
+        executorService,
+        valueToConsumers,
+        keyedPValues,
+        rootInputProvider,
+        registry,
+        transformEnforcements,
+        context);
   }

   private ExecutorServiceParallelExecutor(
       ExecutorService executorService,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Set<PValue> keyedPValues,
+      RootInputProvider rootInputProvider,
       TransformEvaluatorRegistry registry,
       @SuppressWarnings("rawtypes")
       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
@@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
     this.executorService = executorService;
     this.valueToConsumers = valueToConsumers;
     this.keyedPValues = keyedPValues;
+    this.rootInputProvider = rootInputProvider;
     this.registry = registry;
     this.transformEnforcements = transformEnforcements;
     this.evaluationContext = context;
@@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
     for (AppliedPTransform<?, ?, ?> root : roots) {
       ConcurrentLinkedQueue<CommittedBundle<?>> pending = new 
ConcurrentLinkedQueue<>();
-      pending.addAll(registry.getInitialInputs(root));
+      Collection<CommittedBundle<?>> initialInputs = 
rootInputProvider.getInitialInputs(root);
+      checkState(
+          !initialInputs.isEmpty(),
+          "All root transforms must have initial inputs. Got 0 for %s",
+          root.getFullName());
+      pending.addAll(initialInputs);
       pendingRootBundles.put(root, pending);
     }
     evaluationContext.initialize(pendingRootBundles);
@@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
             if (ExecutorState.ACTIVE == startingState
-                || (ExecutorState.PROCESSING == startingState && 
noWorkOutstanding)) {
+                || (ExecutorState.PROCESSING == startingState
+                    && noWorkOutstanding)) {
               scheduleConsumers(update);
             } else {
               allUpdates.offer(update);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 90db040..57d5628 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,15 +17,12 @@
  */
 package org.apache.beam.runners.direct;

-import java.util.Collection;
-import java.util.Collections;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link 
Flatten}
  * {@link PTransform}.
  */
-class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
+class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
   priva
[message truncated...]



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to