johnyangk commented on a change in pull request #62: [NEMO-71] Add 
LocationShareAssignmentPass and Example Application
URL: https://github.com/apache/incubator-nemo/pull/62#discussion_r199793952
 
 

 ##########
 File path: 
examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
 ##########
 @@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An app that analyzes data flow from network trace.
+ */
+public final class NetworkTraceAnalysis {
+  /**
+   * Private constructor.
+   */
+  private NetworkTraceAnalysis() {
+  }
+
+  /**
+   * Main function for the Beam program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String input0FilePath = args[0];
+    final String input1FilePath = args[1];
+    final String outputFilePath = args[2];
+    final PipelineOptions options = 
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("NetworkTraceAnalysis");
+
+    final Pattern pattern = Pattern.compile(" *\\d+ +[0-9.]+ +([0-9.]+) -> 
([0-9.]+) +.*Len=(\\d+)");
+
+    final SimpleFunction<String, Boolean> filter = new SimpleFunction<String, 
Boolean>() {
+      @Override
+      public Boolean apply(final String line) {
+        return pattern.matcher(line).find();
+      }
+    };
+    final SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, 
Double>> mapToStdev
+        = new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, 
KV<String, Double>>() {
+      @Override
+      public KV<String, Double> apply(final KV<String, Iterable<KV<String, 
Long>>> kv) {
+        return KV.of(kv.getKey(), stdev(kv.getValue()));
+      }
+    };
+
+    final Pipeline p = Pipeline.create(options);
+    final PCollection<KV<String, Double>> in0 = GenericSourceSink.read(p, 
input0FilePath)
+        .apply(Filter.by(filter))
 
 Review comment:
   Can you share code for L77-L87 and L89-99?
   Something like the following code would be nice. (Sorry for the code 
quality, i wrote it in a hurry)
   
   // index1 is for the packet's src ip, index2 is for the packet's dst ip
   sharedCode(filepath, index1, index2) {
     .read(filepath)
     .apply(filter)
     .apply(mapelements(index1, index2)
   }
   
   L77-L87 becomes sharedCode(input0FilePath, 2, 1)
   L89-L99 becomes sharedCode(input1FilePath, 1, 2)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to