[tests] integrate Wikipedia session test

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37a9b292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37a9b292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37a9b292

Branch: refs/heads/master
Commit: 37a9b292d7895897225f7484bc18d0d2db55f547
Parents: 3227fcc
Author: Max <[email protected]>
Authored: Tue Feb 23 08:30:34 2016 +0100
Committer: Davor Bonaci <[email protected]>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../flink/dataflow/TopWikipediaSessions.java    | 210 ------------------
 .../dataflow/TopWikipediaSessionsITCase.java    | 215 +++++++++++++------
 2 files changed, 144 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
deleted file mode 100644
index ab5565a..0000000
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
+++ /dev/null
@@ -1,210 +0,0 @@
-///*
-// * Copyright (C) 2015 Google Inc.
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License"); you may 
not
-// * use this file except in compliance with the License. You may obtain a 
copy of
-// * the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// * License for the specific language governing permissions and limitations 
under
-// * the License.
-// */
-//
-//package com.dataartisans.flink.dataflow;
-//
-//import com.google.api.services.bigquery.model.TableRow;
-//import com.google.cloud.dataflow.sdk.Pipeline;
-//import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-//import com.google.cloud.dataflow.sdk.io.TextIO;
-//import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-//import com.google.cloud.dataflow.sdk.options.Default;
-//import com.google.cloud.dataflow.sdk.options.Description;
-//import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-//import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-//import com.google.cloud.dataflow.sdk.options.Validation;
-//import com.google.cloud.dataflow.sdk.transforms.Count;
-//import com.google.cloud.dataflow.sdk.transforms.DoFn;
-//import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-//import com.google.cloud.dataflow.sdk.transforms.PTransform;
-//import com.google.cloud.dataflow.sdk.transforms.ParDo;
-//import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-//import com.google.cloud.dataflow.sdk.transforms.Top;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-//import com.google.cloud.dataflow.sdk.values.KV;
-//import com.google.cloud.dataflow.sdk.values.PCollection;
-//
-//import org.joda.time.Duration;
-//import org.joda.time.Instant;
-//
-//import java.util.List;
-//
-///**
-// * Copied from {@link 
com.google.cloud.dataflow.examples.complete.TopWikipediaSessions} because the 
code
-// * is private there.
-// */
-//public class TopWikipediaSessions {
-//     private static final String EXPORTED_WIKI_TABLE = 
"gs://dataflow-samples/wikipedia_edits/*.json";
-//
-//     /**
-//      * Extracts user and timestamp from a TableRow representing a Wikipedia 
edit.
-//      */
-//     static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
-//             private static final long serialVersionUID = 0;
-//
-//             @Override
-//             public void processElement(ProcessContext c) {
-//                     TableRow row = c.element();
-//                     int timestamp = (Integer) row.get("timestamp");
-//                     String userName = (String) 
row.get("contributor_username");
-//                     if (userName != null) {
-//                             // Sets the implicit timestamp field to be used 
in windowing.
-//                             c.outputWithTimestamp(userName, new 
Instant(timestamp * 1000L));
-//                     }
-//             }
-//     }
-//
-//     /**
-//      * Computes the number of edits in each user session.  A session is 
defined as
-//      * a string of edits where each is separated from the next by less than 
an hour.
-//      */
-//     static class ComputeSessions
-//                     extends PTransform<PCollection<String>, 
PCollection<KV<String, Long>>> {
-//             private static final long serialVersionUID = 0;
-//
-//             @Override
-//             public PCollection<KV<String, Long>> apply(PCollection<String> 
actions) {
-//                     return actions
-//                                     
.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
-//
-//                                     .apply(Count.<String>perElement());
-//             }
-//     }
-//
-//     /**
-//      * Computes the longest session ending in each month.
-//      */
-//     private static class TopPerMonth
-//                     extends PTransform<PCollection<KV<String, Long>>, 
PCollection<List<KV<String, Long>>>> {
-//             private static final long serialVersionUID = 0;
-//
-//             @Override
-//             public PCollection<List<KV<String, Long>>> 
apply(PCollection<KV<String, Long>> sessions) {
-//                     return sessions
-//                                     .apply(Window.<KV<String, 
Long>>into(CalendarWindows.months(1)))
-//
-//                                     .apply(Top.of(1, new 
SerializableComparator<KV<String, Long>>() {
-//                                             private static final long 
serialVersionUID = 0;
-//
-//                                             @Override
-//                                             public int compare(KV<String, 
Long> o1, KV<String, Long> o2) {
-//                                                     return 
Long.compare(o1.getValue(), o2.getValue());
-//                                             }
-//                                     }).withoutDefaults());
-//             }
-//     }
-//
-//     static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, 
KV<String, Long>>
-//                     implements RequiresWindowAccess {
-//
-//             private static final long serialVersionUID = 0;
-//
-//             @Override
-//             public void processElement(ProcessContext c) {
-//                     c.output(KV.of(
-//                                     c.element().getKey() + " : " + 
c.window(), c.element().getValue()));
-//             }
-//     }
-//
-//     static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, 
String>
-//                     implements RequiresWindowAccess {
-//             private static final long serialVersionUID = 0;
-//
-//             @Override
-//             public void processElement(ProcessContext c) {
-//                     for (KV<String, Long> item : c.element()) {
-//                             String session = item.getKey();
-//                             long count = item.getValue();
-//                             c.output(session + " : " + count + " : " + 
((IntervalWindow) c.window()).start());
-//                     }
-//             }
-//     }
-//
-//     static class ComputeTopSessions extends 
PTransform<PCollection<TableRow>, PCollection<String>> {
-//
-//             private static final long serialVersionUID = 0;
-//
-//             private final double samplingThreshold;
-//
-//             public ComputeTopSessions(double samplingThreshold) {
-//                     this.samplingThreshold = samplingThreshold;
-//             }
-//
-//             @Override
-//             public PCollection<String> apply(PCollection<TableRow> input) {
-//                     return input
-//                                     .apply(ParDo.of(new 
ExtractUserAndTimestamp()))
-//
-//                                     .apply(ParDo.named("SampleUsers").of(
-//                                                     new DoFn<String, 
String>() {
-//                                                             private static 
final long serialVersionUID = 0;
-//
-//                                                             @Override
-//                                                             public void 
processElement(ProcessContext c) {
-//                                                                     if 
(Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
-//                                                                             
c.output(c.element());
-//                                                                     }
-//                                                             }
-//                                                     }))
-//
-//                                     .apply(new ComputeSessions())
-//
-//                                     
.apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
-//                                     .apply(new TopPerMonth())
-//                                     
.apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
-//             }
-//     }
-//
-//     /**
-//      * Options supported by this class.
-//      *
-//      * <p> Inherits standard Dataflow configuration options.
-//      */
-//     private static interface Options extends PipelineOptions {
-//             @Description(
-//                             "Input specified as a GCS path containing a 
BigQuery table exported as json")
-//             @Default.String(EXPORTED_WIKI_TABLE)
-//             String getInput();
-//             void setInput(String value);
-//
-//             @Description("File to output results to")
-//             @Validation.Required
-//             String getOutput();
-//             void setOutput(String value);
-//     }
-//
-//     public static void main(String[] args) {
-//             Options options = PipelineOptionsFactory.fromArgs(args)
-//                             .withValidation()
-//                             .as(Options.class);
-//             DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
-//
-//             Pipeline p = Pipeline.create(dataflowOptions);
-//
-//             double samplingThreshold = 0.1;
-//
-//             p.apply(TextIO.Read
-//                             .from(options.getInput())
-//                             .withCoder(TableRowJsonCoder.of()))
-//                             .apply(new 
ComputeTopSessions(samplingThreshold))
-//                             
.apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
-//
-//             p.run();
-//     }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
index 9c8147b..eb020c5 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
@@ -1,71 +1,144 @@
-///*
-// * Copyright 2015 Data Artisans GmbH
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.dataartisans.flink.dataflow;
-//
-//import com.google.api.services.bigquery.model.TableRow;
-//import com.google.cloud.dataflow.sdk.Pipeline;
-//import com.google.cloud.dataflow.sdk.io.TextIO;
-//import com.google.cloud.dataflow.sdk.transforms.Create;
-//import com.google.cloud.dataflow.sdk.values.PCollection;
-//import com.google.common.base.Joiner;
-//import org.apache.flink.test.util.JavaProgramTestBase;
-//
-//import java.util.Arrays;
-//
-//public class TopWikipediaSessionsITCase extends JavaProgramTestBase {
-//     protected String resultPath;
-//
-//     public TopWikipediaSessionsITCase(){
-//     }
-//
-//     static final String[] EXPECTED_RESULT = new String[] {
-//                     "user1 : 
[1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)"
-//                                     + " : 3 : 1970-01-01T00:00:00.000Z",
-//                     "user3 : 
[1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)"
-//                                     + " : 1 : 1970-02-01T00:00:00.000Z" };
-//
-//     @Override
-//     protected void preSubmit() throws Exception {
-//             resultPath = getTempDirPath("result");
-//     }
-//
-//     @Override
-//     protected void postSubmit() throws Exception {
-//             
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-//     }
-//
-//     @Override
-//     protected void testProgram() throws Exception {
-//
-//             Pipeline p = FlinkTestPipeline.create();
-//
-//             PCollection<String> output =
-//                             p.apply(Create.of(Arrays.asList(new 
TableRow().set("timestamp", 0).set
-//                                             ("contributor_username", 
"user1"), new TableRow().set("timestamp", 1).set
-//                                             ("contributor_username", 
"user1"), new TableRow().set("timestamp", 2).set
-//                                             ("contributor_username", 
"user1"), new TableRow().set("timestamp", 0).set
-//                                             ("contributor_username", 
"user2"), new TableRow().set("timestamp", 1).set
-//                                             ("contributor_username", 
"user2"), new TableRow().set("timestamp", 3601).set
-//                                             ("contributor_username", 
"user2"), new TableRow().set("timestamp", 3602).set
-//                                             ("contributor_username", 
"user2"), new TableRow().set("timestamp", 35 * 24 * 3600)
-//                                             .set("contributor_username", 
"user3"))))
-//                                             .apply(new 
TopWikipediaSessions.ComputeTopSessions(1.0));
-//
-//             output.apply(TextIO.Write.to(resultPath));
-//
-//             p.run();
-//     }
-//}
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase 
implements Serializable {
+       protected String resultPath;
+
+       public TopWikipediaSessionsITCase(){
+       }
+
+       static final String[] EXPECTED_RESULT = new String[] {
+                       "user: user1 value:3",
+                       "user: user1 value:1",
+                       "user: user2 value:4",
+                       "user: user2 value:6",
+                       "user: user3 value:7",
+                       "user: user3 value:2"
+       };
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+
+               Pipeline p = FlinkTestPipeline.createStreaming();
+
+               long now = System.currentTimeMillis() + 10000;
+               System.out.println((now + 5000) / 1000);
+
+               PCollection<KV<String, Long>> output =
+                       p.apply(Create.of(Arrays.asList(new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 10).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 2).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 1).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 5).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 7).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 8).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 200).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 230).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 230).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 240).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 245).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 235).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 236).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 237).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 238).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 239).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 240).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 241).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now)
+                                       .set("contributor_username", "user3"))))
+
+
+
+                       .apply(ParDo.of(new DoFn<TableRow, String>() {
+                               @Override
+                               public void processElement(ProcessContext c) 
throws Exception {
+                                       TableRow row = c.element();
+                                       long timestamp = (Long) 
row.get("timestamp");
+                                       String userName = (String) 
row.get("contributor_username");
+                                       if (userName != null) {
+                                               // Sets the timestamp field to 
be used in windowing.
+                                               c.outputWithTimestamp(userName, 
new Instant(timestamp * 1000L));
+                                       }
+                               }
+                       }))
+
+                       .apply(ParDo.named("SampleUsers").of(
+                                       new DoFn<String, String>() {
+                                               private static final long 
serialVersionUID = 0;
+
+                                               @Override
+                                               public void 
processElement(ProcessContext c) {
+                                                       if 
(Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
+                                                               
c.output(c.element());
+                                                       }
+                                               }
+                                       }))
+
+                                       
.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+                                       .apply(Count.<String>perElement());
+
+               PCollection<String> format = output.apply(ParDo.of(new 
DoFn<KV<String, Long>, String>() {
+                       @Override
+                       public void processElement(ProcessContext c) throws 
Exception {
+                               KV<String, Long> el = c.element();
+                               String out = "user: " + el.getKey() + " value:" 
+ el.getValue();
+                               System.out.println(out);
+                               c.output(out);
+                       }
+               }));
+
+               format.apply(TextIO.Write.to(resultPath));
+
+               p.run();
+       }
+}

Reply via email to