[tests] integrate Wikipedia session test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37a9b292 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37a9b292 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37a9b292 Branch: refs/heads/master Commit: 37a9b292d7895897225f7484bc18d0d2db55f547 Parents: 3227fcc Author: Max <[email protected]> Authored: Tue Feb 23 08:30:34 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../flink/dataflow/TopWikipediaSessions.java | 210 ------------------ .../dataflow/TopWikipediaSessionsITCase.java | 215 +++++++++++++------ 2 files changed, 144 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java deleted file mode 100644 index ab5565a..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java +++ /dev/null @@ -1,210 +0,0 @@ -///* -// * Copyright (C) 2015 Google Inc. -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); you may not -// * use this file except in compliance with the License. You may obtain a copy of -// * the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// * License for the specific language governing permissions and limitations under -// * the License. -// */ -// -//package com.dataartisans.flink.dataflow; -// -//import com.google.api.services.bigquery.model.TableRow; -//import com.google.cloud.dataflow.sdk.Pipeline; -//import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; -//import com.google.cloud.dataflow.sdk.io.TextIO; -//import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -//import com.google.cloud.dataflow.sdk.options.Default; -//import com.google.cloud.dataflow.sdk.options.Description; -//import com.google.cloud.dataflow.sdk.options.PipelineOptions; -//import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -//import com.google.cloud.dataflow.sdk.options.Validation; -//import com.google.cloud.dataflow.sdk.transforms.Count; -//import com.google.cloud.dataflow.sdk.transforms.DoFn; -//import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; -//import com.google.cloud.dataflow.sdk.transforms.PTransform; -//import com.google.cloud.dataflow.sdk.transforms.ParDo; -//import com.google.cloud.dataflow.sdk.transforms.SerializableComparator; -//import com.google.cloud.dataflow.sdk.transforms.Top; -//import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows; -//import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; -//import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; -//import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -//import com.google.cloud.dataflow.sdk.values.KV; -//import com.google.cloud.dataflow.sdk.values.PCollection; -// -//import org.joda.time.Duration; -//import org.joda.time.Instant; -// -//import java.util.List; -// -///** -// * Copied from {@link com.google.cloud.dataflow.examples.complete.TopWikipediaSessions} because the code -// * is private there. -// */ -//public class TopWikipediaSessions { -// private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json"; -// -// /** -// * Extracts user and timestamp from a TableRow representing a Wikipedia edit. -// */ -// static class ExtractUserAndTimestamp extends DoFn<TableRow, String> { -// private static final long serialVersionUID = 0; -// -// @Override -// public void processElement(ProcessContext c) { -// TableRow row = c.element(); -// int timestamp = (Integer) row.get("timestamp"); -// String userName = (String) row.get("contributor_username"); -// if (userName != null) { -// // Sets the implicit timestamp field to be used in windowing. -// c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); -// } -// } -// } -// -// /** -// * Computes the number of edits in each user session. A session is defined as -// * a string of edits where each is separated from the next by less than an hour. -// */ -// static class ComputeSessions -// extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { -// private static final long serialVersionUID = 0; -// -// @Override -// public PCollection<KV<String, Long>> apply(PCollection<String> actions) { -// return actions -// .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1)))) -// -// .apply(Count.<String>perElement()); -// } -// } -// -// /** -// * Computes the longest session ending in each month. -// */ -// private static class TopPerMonth -// extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> { -// private static final long serialVersionUID = 0; -// -// @Override -// public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) { -// return sessions -// .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1))) -// -// .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() { -// private static final long serialVersionUID = 0; -// -// @Override -// public int compare(KV<String, Long> o1, KV<String, Long> o2) { -// return Long.compare(o1.getValue(), o2.getValue()); -// } -// }).withoutDefaults()); -// } -// } -// -// static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>> -// implements RequiresWindowAccess { -// -// private static final long serialVersionUID = 0; -// -// @Override -// public void processElement(ProcessContext c) { -// c.output(KV.of( -// c.element().getKey() + " : " + c.window(), c.element().getValue())); -// } -// } -// -// static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String> -// implements RequiresWindowAccess { -// private static final long serialVersionUID = 0; -// -// @Override -// public void processElement(ProcessContext c) { -// for (KV<String, Long> item : c.element()) { -// String session = item.getKey(); -// long count = item.getValue(); -// c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start()); -// } -// } -// } -// -// static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> { -// -// private static final long serialVersionUID = 0; -// -// private final double samplingThreshold; -// -// public ComputeTopSessions(double samplingThreshold) { -// this.samplingThreshold = samplingThreshold; -// } -// -// @Override -// public PCollection<String> apply(PCollection<TableRow> input) { -// return input -// .apply(ParDo.of(new ExtractUserAndTimestamp())) -// -// .apply(ParDo.named("SampleUsers").of( -// new DoFn<String, String>() { -// private static final long serialVersionUID = 0; -// -// @Override -// public void processElement(ProcessContext c) { -// if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) { -// c.output(c.element()); -// } -// } -// })) -// -// .apply(new ComputeSessions()) -// -// .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn())) -// .apply(new TopPerMonth()) -// .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn())); -// } -// } -// -// /** -// * Options supported by this class. -// * -// * <p> Inherits standard Dataflow configuration options. -// */ -// private static interface Options extends PipelineOptions { -// @Description( -// "Input specified as a GCS path containing a BigQuery table exported as json") -// @Default.String(EXPORTED_WIKI_TABLE) -// String getInput(); -// void setInput(String value); -// -// @Description("File to output results to") -// @Validation.Required -// String getOutput(); -// void setOutput(String value); -// } -// -// public static void main(String[] args) { -// Options options = PipelineOptionsFactory.fromArgs(args) -// .withValidation() -// .as(Options.class); -// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); -// -// Pipeline p = Pipeline.create(dataflowOptions); -// -// double samplingThreshold = 0.1; -// -// p.apply(TextIO.Read -// .from(options.getInput()) -// .withCoder(TableRowJsonCoder.of())) -// .apply(new ComputeTopSessions(samplingThreshold)) -// .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput())); -// -// p.run(); -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java index 9c8147b..eb020c5 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java @@ -1,71 +1,144 @@ -///* -// * Copyright 2015 Data Artisans GmbH -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package com.dataartisans.flink.dataflow; -// -//import com.google.api.services.bigquery.model.TableRow; -//import com.google.cloud.dataflow.sdk.Pipeline; -//import com.google.cloud.dataflow.sdk.io.TextIO; -//import com.google.cloud.dataflow.sdk.transforms.Create; -//import com.google.cloud.dataflow.sdk.values.PCollection; -//import com.google.common.base.Joiner; -//import org.apache.flink.test.util.JavaProgramTestBase; -// -//import java.util.Arrays; -// -//public class TopWikipediaSessionsITCase extends JavaProgramTestBase { -// protected String resultPath; -// -// public TopWikipediaSessionsITCase(){ -// } -// -// static final String[] EXPECTED_RESULT = new String[] { -// "user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)" -// + " : 3 : 1970-01-01T00:00:00.000Z", -// "user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)" -// + " : 1 : 1970-02-01T00:00:00.000Z" }; -// -// @Override -// protected void preSubmit() throws Exception { -// resultPath = getTempDirPath("result"); -// } -// -// @Override -// protected void postSubmit() throws Exception { -// compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); -// } -// -// @Override -// protected void testProgram() throws Exception { -// -// Pipeline p = FlinkTestPipeline.create(); -// -// PCollection<String> output = -// p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", 0).set -// ("contributor_username", "user1"), new TableRow().set("timestamp", 1).set -// ("contributor_username", "user1"), new TableRow().set("timestamp", 2).set -// ("contributor_username", "user1"), new TableRow().set("timestamp", 0).set -// ("contributor_username", "user2"), new TableRow().set("timestamp", 1).set -// ("contributor_username", "user2"), new TableRow().set("timestamp", 3601).set -// ("contributor_username", "user2"), new TableRow().set("timestamp", 3602).set -// ("contributor_username", "user2"), new TableRow().set("timestamp", 35 * 24 * 3600) -// .set("contributor_username", "user3")))) -// .apply(new TopWikipediaSessions.ComputeTopSessions(1.0)); -// -// output.apply(TextIO.Write.to(resultPath)); -// -// p.run(); -// } -//} +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + + +/** + * Session window test + */ +public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { + protected String resultPath; + + public TopWikipediaSessionsITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "user: user1 value:3", + "user: user1 value:1", + "user: user2 value:4", + "user: user2 value:6", + "user: user3 value:7", + "user: user3 value:2" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createStreaming(); + + long now = System.currentTimeMillis() + 10000; + System.out.println((now + 5000) / 1000); + + PCollection<KV<String, Long>> output = + p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now) + .set("contributor_username", "user3")))) + + + + .apply(ParDo.of(new DoFn<TableRow, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + TableRow row = c.element(); + long timestamp = (Long) row.get("timestamp"); + String userName = (String) row.get("contributor_username"); + if (userName != null) { + // Sets the timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); + } + } + })) + + .apply(ParDo.named("SampleUsers").of( + new DoFn<String, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) { + c.output(c.element()); + } + } + })) + + .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) + .apply(Count.<String>perElement()); + + PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<String, Long> el = c.element(); + String out = "user: " + el.getKey() + " value:" + el.getValue(); + System.out.println(out); + c.output(out); + } + })); + + format.apply(TextIO.Write.to(resultPath)); + + p.run(); + } +}
