http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java new file mode 100644 index 0000000..e6c5ae2 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java @@ -0,0 +1,303 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter; +import com.google.cloud.dataflow.sdk.coders.*; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.state.*; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.joda.time.Instant; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class StateSerializationTest { + + private static final StateNamespace NAMESPACE_1 = StateNamespaces.global(); + private static final String KEY_PREFIX = "TEST_"; + + // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs + // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly + // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn. + private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER = + new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() { + @Override + public int[] createAccumulator(Object key, CombineWithContext.Context c) { + return new int[1]; + } + + @Override + public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) { + accumulator[0] += value; + return accumulator; + } + + @Override + public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) { + int[] r = new int[1]; + for (int[] a : accumulators) { + r[0] += a[0]; + } + return r; + } + + @Override + public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) { + return accumulator[0]; + } + }; + + private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of( + VarIntCoder.of(), + new DelegateCoder.CodingFunction<int[], Integer>() { + @Override + public Integer apply(int[] accumulator) { + return accumulator[0]; + } + }, + new DelegateCoder.CodingFunction<Integer, int[]>() { + @Override + public int[] apply(Integer value) { + int[] a = new int[1]; + a[0] = value; + return a; + } + }); + + private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR = + StateTags.value("stringValue", VarIntCoder.of()); + private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = + StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER); + private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + + private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>(); + + private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private void initializeStateAndTimers() throws CannotProvideCoderException { + for (int i = 0; i < 10; i++) { + String key = KEY_PREFIX + i; + + FlinkStateInternals state = initializeStateForKey(key); + Set<TimerInternals.TimerData> timers = new HashSet<>(); + for (int j = 0; j < 5; j++) { + TimerInternals.TimerData timer = TimerInternals + .TimerData.of(NAMESPACE_1, + new Instant(1000 + i + j), TimeDomain.values()[j % 3]); + timers.add(timer); + } + + statePerKey.put(key, state); + activeTimers.put(key, timers); + } + } + + private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = createState(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + value.write("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + value2.write(4); + value2.write(5); + + AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + combiningValue.add(1); + combiningValue.add(2); + + WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + watermark.add(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + bag.add("v1"); + bag.add("v2"); + bag.add("v3"); + bag.add("v4"); + return state; + } + + private boolean restoreAndTestState(DataInputView in) throws Exception { + StateCheckpointReader reader = new StateCheckpointReader(in); + final ClassLoader userClassloader = this.getClass().getClassLoader(); + Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder(); + Coder<String> keyCoder = StringUtf8Coder.of(); + + boolean comparisonRes = true; + + for (String key : statePerKey.keySet()) { + comparisonRes &= checkStateForKey(key); + } + + // restore the timers + Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + if (activeTimers.size() != restoredTimersPerKey.size()) { + return false; + } + + for (String key : statePerKey.keySet()) { + Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key); + Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key); + comparisonRes &= checkTimersForKey(originalTimers, restoredTimers); + } + + // restore the state + Map<String, FlinkStateInternals<String>> restoredPerKeyState = + StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader); + if (restoredPerKeyState.size() != statePerKey.size()) { + return false; + } + + for (String key : statePerKey.keySet()) { + FlinkStateInternals<String> originalState = statePerKey.get(key); + FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key); + comparisonRes &= checkStateForKey(originalState, restoredState); + } + return comparisonRes; + } + + private boolean checkStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = statePerKey.get(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = value.read().equals("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= value2.read().equals(5); + + AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combiningValue.read().equals(3); + + WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= watermark.read().equals(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + Iterator<String> it = bag.read().iterator(); + int i = 0; + while (it.hasNext()) { + comp &= it.next().equals("v" + (++i)); + } + return comp; + } + + private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception { + StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out); + Coder<String> keyCoder = StringUtf8Coder.of(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder); + } + + private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) { + boolean comp = true; + if (restoredTimers == null) { + return false; + } + + if (originalTimers.size() != restoredTimers.size()) { + return false; + } + + for (TimerInternals.TimerData timer : originalTimers) { + comp &= restoredTimers.contains(timer); + } + return comp; + } + + private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException { + if (restoredState == null) { + return false; + } + + ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR); + ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = orValue.read().equals(resValue.read()); + + ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR); + ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= orIntValue.read().equals(resIntValue.read()); + + AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combOrValue.read().equals(combResValue.read()); + + WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= orWatermark.read().equals(resWatermark.read()); + + BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR); + + Iterator<String> orIt = orBag.read().iterator(); + Iterator<String> resIt = resBag.read().iterator(); + + while (orIt.hasNext() && resIt.hasNext()) { + comp &= orIt.next().equals(resIt.next()); + } + + return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp; + } + + private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException { + return new FlinkStateInternals<>( + key, + StringUtf8Coder.of(), + IntervalWindow.getCoder(), + OutputTimeFns.outputAtEarliestInputTimestamp()); + } + + @Test + public void test() throws Exception { + StateSerializationTest test = new StateSerializationTest(); + test.initializeStateAndTimers(); + + MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048); + AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend); + + test.storeState(out); + + byte[] contents = memBackend.closeAndGetBytes(); + DataInputView in = new DataInputDeserializer(contents, 0, contents.length); + + assertEquals(test.restoreAndTestState(in), true); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java new file mode 100644 index 0000000..f0b93a0 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java @@ -0,0 +1,132 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.FlinkTestPipeline; +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + + +/** + * Session window test + */ +public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { + protected String resultPath; + + public TopWikipediaSessionsITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "user: user1 value:3", + "user: user1 value:1", + "user: user2 value:4", + "user: user2 value:6", + "user: user3 value:7", + "user: user3 value:2" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + Long now = (System.currentTimeMillis() + 10000) / 1000; + + PCollection<KV<String, Long>> output = + p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now) + .set("contributor_username", "user3")))) + + + + .apply(ParDo.of(new DoFn<TableRow, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + TableRow row = c.element(); + long timestamp = (Integer) row.get("timestamp"); + String userName = (String) row.get("contributor_username"); + if (userName != null) { + // Sets the timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); + } + } + })) + + .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) + + .apply(Count.<String>perElement()); + + PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<String, Long> el = c.element(); + String out = "user: " + el.getKey() + " value:" + el.getValue(); + c.output(out); + } + })); + + format.apply(TextIO.Write.to(resultPath)); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java new file mode 100644 index 0000000..620dace --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java @@ -0,0 +1,158 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.util; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; + +/** + * Copied from {@link com.google.cloud.dataflow.examples.JoinExamples} because the code + * is private there. + */ +public class JoinExamples { + + // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. + private static final String GDELT_EVENTS_TABLE = + "clouddataflow-readonly:samples.gdelt_sample"; + // A table that maps country codes to country names. + private static final String COUNTRY_CODES = + "gdelt-bq:full.crosswalk_geocountrycodetohuman"; + + /** + * Join two collections, using country code as the key. + */ + public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, + PCollection<TableRow> countryCodes) throws Exception { + + final TupleTag<String> eventInfoTag = new TupleTag<>(); + final TupleTag<String> countryInfoTag = new TupleTag<>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> eventInfo = eventsTable.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> countryInfo = countryCodes.apply( + ParDo.of(new ExtractCountryInfoFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(eventInfoTag, eventInfo) + .and(countryInfoTag, countryInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + CoGbkResult val = e.getValue(); + String countryCode = e.getKey(); + String countryName; + countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); + for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(countryCode, "Country name: " + countryName + + ", Event info: " + eventInfo)); + } + } + })); + + // write to GCS + return finalResultCollection + .apply(ParDo.of(new DoFn<KV<String, String>, String>() { + @Override + public void processElement(ProcessContext c) { + String outputstring = "Country code: " + c.element().getKey() + + ", " + c.element().getValue(); + c.output(outputstring); + } + })); + } + + /** + * Examines each row (event) in the input table. Output a KV with the key the country + * code of the event, and the value a string encoding event information. + */ + static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("ActionGeo_CountryCode"); + String sqlDate = (String) row.get("SQLDATE"); + String actor1Name = (String) row.get("Actor1Name"); + String sourceUrl = (String) row.get("SOURCEURL"); + String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; + c.output(KV.of(countryCode, eventInfo)); + } + } + + + /** + * Examines each row (country info) in the input table. Output a KV with the key the country + * code, and the value the country name. + */ + static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("FIPSCC"); + String countryName = (String) row.get("HumanName"); + c.output(KV.of(countryCode, countryName)); + } + } + + + /** + * Options supported by {@link JoinExamples}. + * <p> + * Inherits standard configuration options. + */ + private interface Options extends PipelineOptions { + @Description("Path of the file to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); + PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); + PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); + formattedResults.apply(TextIO.Write.to(options.getOutput())); + p.run(); + } + +}
