madhawa-gunasekara commented on a change in pull request #175: GORA-546 
Hazelcast Jet execution engine support
URL: https://github.com/apache/gora/pull/175#discussion_r303978932
 
 

 ##########
 File path: gora-jet/src/main/java/org/apache/gora/jet/JetSource.java
 ##########
 @@ -0,0 +1,89 @@
+package org.apache.gora.jet;
+
+import com.hazelcast.jet.Traverser;
+import com.hazelcast.jet.core.AbstractProcessor;
+import com.hazelcast.jet.core.ProcessorMetaSupplier;
+import com.hazelcast.jet.core.ProcessorSupplier;
+import com.hazelcast.nio.Address;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Result;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static com.hazelcast.jet.Traversers.traverseIterable;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+
+public class JetSource<KeyIn, ValueIn extends PersistentBase> implements 
ProcessorMetaSupplier {
+
+  private int totalParallelism;
+  private transient int localParallelism;
+
+  @Override
+  public void init(@Nonnull Context context) {
+    totalParallelism = context.totalParallelism();
+    localParallelism = context.localParallelism();
+  }
+
+  @Nonnull
+  @Override
+  public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull 
List<Address> addresses) {
+    Map<Address, ProcessorSupplier> map = new HashMap<>();
+    for (int i = 0; i < addresses.size(); i++) {
+      // We'll calculate the global index of each processor in the cluster:
+      //globalIndexBase is the first processor index in a certain Jet-Cluster 
member
+      int globalIndexBase = localParallelism * i;
+
+      // processorCount will be equal to localParallelism:
+      ProcessorSupplier supplier = processorCount ->
+          range(globalIndexBase, globalIndexBase + processorCount)
+              .mapToObj(globalIndex ->
+                  new GoraJetProcessor<KeyIn, 
ValueIn>(getPartionedData(globalIndex))
+              ).collect(toList());
+      map.put(addresses.get(i), supplier);
+    }
+    return map::get;
+  }
+
+  List<JetInputOutputFormat<KeyIn, ValueIn>> getPartionedData(int globalIndex) 
{
+    try {
+      List<PartitionQuery<KeyIn, ValueIn>> partitionQueries = 
JetEngine.dataInStore.getPartitions(JetEngine.query);
+      List<JetInputOutputFormat<KeyIn, ValueIn>> resultsList = new 
ArrayList<>();
+      int i = 1;
+      int partitionNo = globalIndex;
+      while (partitionNo < partitionQueries.size()) {
+        Result<KeyIn, ValueIn> result = null;
+        result = partitionQueries.get(partitionNo).execute();
+        while (result.next()) {
+          resultsList.add(new JetInputOutputFormat<>(result.getKey(), 
result.get()));
+        }
+        partitionNo = (i * totalParallelism) + globalIndex;
+        i++;
+      }
+      return resultsList;
+    } catch (Exception e) {
+      e.printStackTrace();
 
 Review comment:
   Remove printStackTraces 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to