[ 
https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=252064&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252064
 ]

ASF GitHub Bot logged work on BEAM-7305:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/May/19 21:13
            Start Date: 31/May/19 21:13
    Worklog Time Spent: 10m 
      Work Description: viliam-durina commented on pull request #8699: 
[BEAM-7305] Fix issues in and extend documentation for Hazelcast Jet Runner 
URL: https://github.com/apache/beam/pull/8699#discussion_r289552490
 
 

 ##########
 File path: 
runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/ImpulseP.java
 ##########
 @@ -19,28 +19,89 @@
 
 import com.hazelcast.jet.core.AbstractProcessor;
 import com.hazelcast.jet.core.Processor;
-import com.hazelcast.jet.function.SupplierEx;
+import com.hazelcast.jet.core.ProcessorMetaSupplier;
+import com.hazelcast.jet.core.ProcessorSupplier;
+import com.hazelcast.nio.Address;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import org.apache.beam.runners.jet.Utils;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * /** * Jet {@link com.hazelcast.jet.core.Processor} implementation for 
Beam's Impulse primitive.
  */
 public class ImpulseP extends AbstractProcessor {
 
+  private final boolean active;
+  private final Coder outputCoder;
   private final String ownerId; // do not remove it, very useful for debugging
 
-  private ImpulseP(String ownerId) {
+  private ImpulseP(boolean active, Coder outputCoder, String ownerId) {
+    this.active = active;
+    this.outputCoder = outputCoder;
     this.ownerId = ownerId;
   }
 
-  public static SupplierEx<Processor> supplier(String ownerId) {
-    return () -> new ImpulseP(ownerId);
-  }
-
   @Override
   public boolean complete() {
-    return tryEmit(
-        WindowedValue.valueInGlobalWindow(
-            new byte[0])); // todo: should EACH processor emit this byte[] or 
just a SINGLE one?
+    if (active) {
+      return tryEmit(Utils.encode(WindowedValue.valueInGlobalWindow(new 
byte[0]), outputCoder));
+    } else {
+      return true;
+    }
+  }
+
+  public static ProcessorMetaSupplier supplier(Coder outputCoder, String 
ownerId) {
+    return new ImpulseMetaProcessorSupplier(outputCoder, ownerId);
+  }
+
+  private static class ImpulseMetaProcessorSupplier implements 
ProcessorMetaSupplier {
+
+    private final Coder outputCoder;
+    private final String ownerId;
+
+    private ImpulseMetaProcessorSupplier(Coder outputCoder, String ownerId) {
+      this.outputCoder = outputCoder;
+      this.ownerId = ownerId;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nonnull
+    @Override
+    public Function<? super Address, ? extends ProcessorSupplier> get(
+        @Nonnull List<Address> addresses) {
+      return address -> new ImpulseProcessorSupplier(outputCoder, ownerId);
+    }
+  }
+
+  private static class ImpulseProcessorSupplier<T> implements 
ProcessorSupplier {
+    private final Coder outputCoder;
+    private final String ownerId;
+    private transient ProcessorSupplier.Context context;
+
+    private ImpulseProcessorSupplier(Coder outputCoder, String ownerId) {
+      this.outputCoder = outputCoder;
+      this.ownerId = ownerId;
+    }
+
+    @Override
+    public void init(@Nonnull Context context) {
+      this.context = context;
+    }
+
+    @Nonnull
+    @Override
+    public Collection<? extends Processor> get(int count) {
+      int indexBase = context.memberIndex() * context.localParallelism();
+      List<Processor> res = new ArrayList<>(count);
+      for (int i = 0; i < count; i++, indexBase++) {
+        res.add(new ImpulseP(indexBase == 0, outputCoder, ownerId));
 
 Review comment:
   You don't need PMS/PS for this, `Processor.Context` has 
`globalProcessorIndex()`.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 252064)
    Time Spent: 8.5h  (was: 8h 20m)

> Add first version of Hazelcast Jet Runner
> -----------------------------------------
>
>                 Key: BEAM-7305
>                 URL: https://issues.apache.org/jira/browse/BEAM-7305
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-jet
>            Reporter: Maximilian Michels
>            Assignee: Jozsef Bartok
>            Priority: Major
>             Fix For: 2.14.0
>
>          Time Spent: 8.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to