Got your message, so it was a mistake and already fixed.

No worries. Thanks,
Regards
JB

On 10/14/2016 06:35 AM, Jean-Baptiste Onofré wrote:
I saw a push of a eclipse branch on the Beam git repo.

Maybe I missed it, but I didn't see discussion about such branch on the
dev mailing list.

Regards
JB

On 10/13/2016 06:52 PM, Daniel Kulp wrote:

I just submitted a pull request that fixes the code as well as
cherry-picks the yaml change from the last branch.

Dan



On Oct 13, 2016, at 10:48 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Indeed the .travis.yml has not been merged. I gonna fix that.

Sorry about that.

Regards
JB

On 10/13/2016 04:37 PM, Daniel Kulp wrote:

This is in m2e.    That said, it looks like the travis.yml file
wasn’t merged from my “eclipse” branch so Travis wasn’t actually
running agains the eclipse compiler.   That would have caught
this.   JB and I will investigate how that got lost in the merge to
master.

A "mvn -Peclipse-jdt clean install” in direct-java would show the
same error.


Dan



On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré
<j...@nanthrax.net> wrote:

Hi Dan,

You mean directly building in Eclipse I guess using m2e ?

Regards
JB

On 10/13/2016 03:59 PM, Daniel Kulp wrote:

Just an FYI:   this commit has caused things to not build in
Eclipse, but I’m not exactly sure why.   The errors are in place
where methods of the exact signature just moved into an internal
class so I’m not yet sure why it’s causing an issue.

Description    Resource    Path    Location    Type
Bound mismatch: The type Read.Bounded<OutputT> is not a valid
substitute for the bounded parameter <TransformT extends
PTransform<? super InputT,OutputT>> of the type
AppliedPTransform<InputT,OutputT,TransformT>
BoundedReadEvaluatorFactory.java
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct
line 134    Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:
Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0

Tree:
http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff:
http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:14:38 2016 -0700

----------------------------------------------------------------------

.../direct/BoundedReadEvaluatorFactory.java     |  40 +++--
.../beam/runners/direct/DirectRunner.java       |   2 +
.../beam/runners/direct/EmptyInputProvider.java |  49 ++++++
.../direct/ExecutorServiceParallelExecutor.java |  27 ++-
.../runners/direct/FlattenEvaluatorFactory.java |  18 +-
.../beam/runners/direct/KeyedResourcePool.java  |  47 ------
.../runners/direct/LockedKeyedResourcePool.java |  95 -----------
.../beam/runners/direct/RootInputProvider.java  |  41 +++++
.../runners/direct/RootProviderRegistry.java    |  65 ++++++++
.../direct/RootTransformEvaluatorFactory.java   |  42 -----
.../direct/TestStreamEvaluatorFactory.java      |  39 +++--
.../direct/TransformEvaluatorRegistry.java      |  17 +-
.../direct/UnboundedReadEvaluatorFactory.java   |  56 ++++---
.../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
.../direct/FlattenEvaluatorFactoryTest.java     |   3 +-
.../direct/LockedKeyedResourcePoolTest.java     | 163
-------------------
.../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
.../UnboundedReadEvaluatorFactoryTest.java      |   8 +-
18 files changed, 269 insertions(+), 449 deletions(-)
----------------------------------------------------------------------



http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

----------------------------------------------------------------------

diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

index 4936ad9..326a535 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
* A {@link TransformEvaluatorFactory} that produces {@link
TransformEvaluator TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
-final class BoundedReadEvaluatorFactory implements
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements
TransformEvaluatorFactory {
 private final EvaluationContext evaluationContext;

 BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
   this.evaluationContext = evaluationContext;
 }

-  @Override
-  public Collection<CommittedBundle<?>>
getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
-    return createInitialSplits((AppliedPTransform) transform);
-  }
-
-  private <OutputT> Collection<CommittedBundle<?>>
createInitialSplits(
-      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
-    BoundedSource<OutputT> source =
transform.getTransform().getSource();
-    return Collections.<CommittedBundle<?>>singleton(
-        evaluationContext
-            .<BoundedSourceShard<OutputT>>createRootBundle()
-
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))

-            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
 @SuppressWarnings({"unchecked", "rawtypes"})
 @Override
 @Nullable
@@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory
implements RootTransformEvaluatorFactory

   abstract BoundedSource<T> getSource();
 }
+
+  static class InputProvider implements RootInputProvider {
+    private final EvaluationContext evaluationContext;
+
+    InputProvider(EvaluationContext evaluationContext) {
+      this.evaluationContext = evaluationContext;
+    }
+
+    @Override
+    public Collection<CommittedBundle<?>>
getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+      return createInitialSplits((AppliedPTransform) transform);
+    }
+
+    private <OutputT> Collection<CommittedBundle<?>>
createInitialSplits(
+        AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+      BoundedSource<OutputT> source =
transform.getTransform().getSource();
+      return Collections.<CommittedBundle<?>>singleton(
+          evaluationContext
+              .<BoundedSourceShard<OutputT>>createRootBundle()
+
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))

+              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    }
+  }
}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

----------------------------------------------------------------------

diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

index 2ec4f08..67ec3e6 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

@@ -248,12 +248,14 @@ public class DirectRunner
   // independent executor service for each run
   ExecutorService executorService = executorServiceSupplier.get();

+    RootInputProvider rootInputProvider =
RootProviderRegistry.defaultRegistry(context);
   TransformEvaluatorRegistry registry =
TransformEvaluatorRegistry.defaultRegistry(context);
   PipelineExecutor executor =
       ExecutorServiceParallelExecutor.create(
           executorService,
           consumerTrackingVisitor.getValueToConsumers(),
           keyedPValueVisitor.getKeyedPValues(),
+            rootInputProvider,
           registry,
           defaultModelEnforcements(options),
           context);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java

----------------------------------------------------------------------

diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java

new file mode 100644
index 0000000..10d63e9
--- /dev/null
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
software
+ * distributed under the License is distributed on an "AS IS"
BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
+ * See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A {@link RootInputProvider} that provides a singleton empty
bundle.
+ */
+class EmptyInputProvider implements RootInputProvider {
+  private final EvaluationContext evaluationContext;
+
+  EmptyInputProvider(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>Returns a single empty bundle. This bundle ensures that
any {@link PTransform PTransforms}
+   * that consume from the output of the provided {@link
AppliedPTransform} have watermarks updated
+   * as appropriate.
+   */
+  @Override
+  public Collection<CommittedBundle<?>>
getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    return Collections.<CommittedBundle<?>>singleton(
+
evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));

+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java

----------------------------------------------------------------------

diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java

index bb89699..52c45c3 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java

+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java

@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;

+import static com.google.common.base.Preconditions.checkState;
+
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
@@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor
implements PipelineExecutor {

 private final Map<PValue, Collection<AppliedPTransform<?, ?,
?>>> valueToConsumers;
 private final Set<PValue> keyedPValues;
+  private final RootInputProvider rootInputProvider;
 private final TransformEvaluatorRegistry registry;
 @SuppressWarnings("rawtypes")
 private final Map<Class<? extends PTransform>,
Collection<ModelEnforcementFactory>>
@@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor
implements PipelineExecutor {
     ExecutorService executorService,
     Map<PValue, Collection<AppliedPTransform<?, ?, ?>>>
valueToConsumers,
     Set<PValue> keyedPValues,
+      RootInputProvider rootInputProvider,
     TransformEvaluatorRegistry registry,
     @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>,
Collection<ModelEnforcementFactory>> transformEnforcements,
+          Map<Class<? extends PTransform>,
Collection<ModelEnforcementFactory>>
+              transformEnforcements,
     EvaluationContext context) {
   return new ExecutorServiceParallelExecutor(
-        executorService, valueToConsumers, keyedPValues,
registry, transformEnforcements, context);
+        executorService,
+        valueToConsumers,
+        keyedPValues,
+        rootInputProvider,
+        registry,
+        transformEnforcements,
+        context);
 }

 private ExecutorServiceParallelExecutor(
     ExecutorService executorService,
     Map<PValue, Collection<AppliedPTransform<?, ?, ?>>>
valueToConsumers,
     Set<PValue> keyedPValues,
+      RootInputProvider rootInputProvider,
     TransformEvaluatorRegistry registry,
     @SuppressWarnings("rawtypes")
     Map<Class<? extends PTransform>,
Collection<ModelEnforcementFactory>> transformEnforcements,
@@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor
implements PipelineExecutor {
   this.executorService = executorService;
   this.valueToConsumers = valueToConsumers;
   this.keyedPValues = keyedPValues;
+    this.rootInputProvider = rootInputProvider;
   this.registry = registry;
   this.transformEnforcements = transformEnforcements;
   this.evaluationContext = context;
@@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor
implements PipelineExecutor {
 public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
   for (AppliedPTransform<?, ?, ?> root : roots) {
     ConcurrentLinkedQueue<CommittedBundle<?>> pending = new
ConcurrentLinkedQueue<>();
-      pending.addAll(registry.getInitialInputs(root));
+      Collection<CommittedBundle<?>> initialInputs =
rootInputProvider.getInitialInputs(root);
+      checkState(
+          !initialInputs.isEmpty(),
+          "All root transforms must have initial inputs. Got 0
for %s",
+          root.getFullName());
+      pending.addAll(initialInputs);
     pendingRootBundles.put(root, pending);
   }
   evaluationContext.initialize(pendingRootBundles);
@@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor
implements PipelineExecutor {
         LOG.debug("Executor Update: {}", update);
         if (update.getBundle().isPresent()) {
           if (ExecutorState.ACTIVE == startingState
-                || (ExecutorState.PROCESSING == startingState &&
noWorkOutstanding)) {
+                || (ExecutorState.PROCESSING == startingState
+                    && noWorkOutstanding)) {
             scheduleConsumers(update);
           } else {
             allUpdates.offer(update);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java

----------------------------------------------------------------------

diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java

index 90db040..57d5628 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java

+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java

@@ -17,15 +17,12 @@
*/
package org.apache.beam.runners.direct;

-import java.util.Collection;
-import java.util.Collections;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import
org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import
org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -34,26 +31,13 @@ import
org.apache.beam.sdk.values.PCollectionList;
* The {@link DirectRunner} {@link TransformEvaluatorFactory} for
the {@link Flatten}
* {@link PTransform}.
*/
-class FlattenEvaluatorFactory implements
RootTransformEvaluatorFactory {
+class FlattenEvaluatorFactory implements
TransformEvaluatorFactory {
 priva
[message truncated...]



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to