http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java new file mode 100644 index 0000000..84007af --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -0,0 +1,713 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; +import com.google.cloud.dataflow.sdk.util.state.*; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.flink.util.InstantiationUtil; +import org.joda.time.Instant; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.*; + +/** + * An implementation of the Beam {@link StateInternals}. This implementation simply keeps elements in memory. + * This state is periodically checkpointed by Flink, for fault-tolerance. + * + * TODO: State should be rewritten to redirect to Flink per-key state so that coders and combiners don't need + * to be serialized along with encoded values when snapshotting. + */ +public class FlinkStateInternals<K> implements StateInternals<K> { + + private final K key; + + private final Coder<K> keyCoder; + + private final Coder<? extends BoundedWindow> windowCoder; + + private final OutputTimeFn<? super BoundedWindow> outputTimeFn; + + private Instant watermarkHoldAccessor; + + public FlinkStateInternals(K key, + Coder<K> keyCoder, + Coder<? extends BoundedWindow> windowCoder, + OutputTimeFn<? super BoundedWindow> outputTimeFn) { + this.key = key; + this.keyCoder = keyCoder; + this.windowCoder = windowCoder; + this.outputTimeFn = outputTimeFn; + } + + public Instant getWatermarkHold() { + return watermarkHoldAccessor; + } + + /** + * This is the interface state has to implement in order for it to be fault tolerant when + * executed by the FlinkPipelineRunner. + */ + private interface CheckpointableIF { + + boolean shouldPersist(); + + void persistState(StateCheckpointWriter checkpointBuilder) throws IOException; + } + + protected final StateTable<K> inMemoryState = new StateTable<K>() { + @Override + protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) { + return new StateTag.StateBinder<K>() { + + @Override + public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder); + } + + @Override + public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) { + return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn); + } + }; + } + }; + + @Override + public K getKey() { + return key; + } + + @Override + public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) { + return inMemoryState.get(namespace, address, null); + } + + @Override + public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) { + return inMemoryState.get(namespace, address, c); + } + + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + checkpointBuilder.writeInt(getNoOfElements()); + + for (State location : inMemoryState.values()) { + if (!(location instanceof CheckpointableIF)) { + throw new IllegalStateException(String.format( + "%s wasn't created by %s -- unable to persist it", + location.getClass().getSimpleName(), + getClass().getSimpleName())); + } + ((CheckpointableIF) location).persistState(checkpointBuilder); + } + } + + public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader) + throws IOException, ClassNotFoundException { + + // the number of elements to read. + int noOfElements = checkpointReader.getInt(); + for (int i = 0; i < noOfElements; i++) { + decodeState(checkpointReader, loader); + } + } + + /** + * We remove the first character which encodes the type of the stateTag ('s' for system + * and 'u' for user). For more details check out the source of + * {@link StateTags.StateTagBase#getId()}. + */ + private void decodeState(StateCheckpointReader reader, ClassLoader loader) + throws IOException, ClassNotFoundException { + + StateType stateItemType = StateType.deserialize(reader); + ByteString stateKey = reader.getTag(); + + // first decode the namespace and the tagId... + String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+"); + if (namespaceAndTag.length != 2) { + throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + "."); + } + StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder); + + // ... decide if it is a system or user stateTag... + char ownerTag = namespaceAndTag[1].charAt(0); + if (ownerTag != 's' && ownerTag != 'u') { + throw new RuntimeException("Invalid StateTag name."); + } + boolean isSystemTag = ownerTag == 's'; + String tagId = namespaceAndTag[1].substring(1); + + // ...then decode the coder (if there is one)... + Coder<?> coder = null; + switch (stateItemType) { + case VALUE: + case LIST: + case ACCUMULATOR: + ByteString coderBytes = reader.getData(); + coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader); + break; + case WATERMARK: + break; + } + + // ...then decode the combiner function (if there is one)... + CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null; + switch (stateItemType) { + case ACCUMULATOR: + ByteString combinerBytes = reader.getData(); + combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader); + break; + case VALUE: + case LIST: + case WATERMARK: + break; + } + + //... and finally, depending on the type of the state being decoded, + // 1) create the adequate stateTag, + // 2) create the state container, + // 3) restore the actual content. + switch (stateItemType) { + case VALUE: { + StateTag stateTag = StateTags.value(tagId, coder); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + @SuppressWarnings("unchecked") + FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null); + value.restoreState(reader); + break; + } + case WATERMARK: { + @SuppressWarnings("unchecked") + StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + @SuppressWarnings("unchecked") + FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null); + watermark.restoreState(reader); + break; + } + case LIST: { + StateTag stateTag = StateTags.bag(tagId, coder); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null); + bag.restoreState(reader); + break; + } + case ACCUMULATOR: { + @SuppressWarnings("unchecked") + StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + @SuppressWarnings("unchecked") + FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue = + (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null); + combiningValue.restoreState(reader); + break; + } + default: + throw new RuntimeException("Unknown State Type " + stateItemType + "."); + } + } + + private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) { + StringBuilder sb = new StringBuilder(); + try { + namespace.appendTo(sb); + sb.append('+'); + address.appendTo(sb); + } catch (IOException e) { + throw new RuntimeException(e); + } + return ByteString.copyFromUtf8(sb.toString()); + } + + private int getNoOfElements() { + int noOfElements = 0; + for (State state : inMemoryState.values()) { + if (!(state instanceof CheckpointableIF)) { + throw new RuntimeException("State Implementations used by the " + + "Flink Dataflow Runner should implement the CheckpointableIF interface."); + } + + if (((CheckpointableIF) state).shouldPersist()) { + noOfElements++; + } + } + return noOfElements; + } + + private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF { + + private final ByteString stateKey; + private final Coder<T> elemCoder; + + private T value = null; + + public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) { + this.stateKey = stateKey; + this.elemCoder = elemCoder; + } + + @Override + public void clear() { + value = null; + } + + @Override + public void write(T input) { + this.value = input; + } + + @Override + public T read() { + return value; + } + + @Override + public ValueState<T> readLater() { + // Ignore + return this; + } + + @Override + public boolean shouldPersist() { + return value != null; + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (value != null) { + // serialize the coder. + byte[] coder = InstantiationUtil.serializeObject(elemCoder); + + // encode the value into a ByteString + ByteString.Output stream = ByteString.newOutput(); + elemCoder.encode(value, stream, Coder.Context.OUTER); + ByteString data = stream.toByteString(); + + checkpointBuilder.addValueBuilder() + .setTag(stateKey) + .setData(coder) + .setData(data); + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + ByteString valueContent = checkpointReader.getData(); + T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); + write(outValue); + } + } + + private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow> + implements WatermarkHoldState<W>, CheckpointableIF { + + private final ByteString stateKey; + + private Instant minimumHold = null; + + private OutputTimeFn<? super W> outputTimeFn; + + public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) { + this.stateKey = stateKey; + this.outputTimeFn = outputTimeFn; + } + + @Override + public void clear() { + // Even though we're clearing we can't remove this from the in-memory state map, since + // other users may already have a handle on this WatermarkBagInternal. + minimumHold = null; + watermarkHoldAccessor = null; + } + + @Override + public void add(Instant watermarkHold) { + if (minimumHold == null || minimumHold.isAfter(watermarkHold)) { + watermarkHoldAccessor = watermarkHold; + minimumHold = watermarkHold; + } + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public Boolean read() { + return minimumHold == null; + } + + @Override + public ReadableState<Boolean> readLater() { + // Ignore + return this; + } + }; + } + + @Override + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; + } + + @Override + public Instant read() { + return minimumHold; + } + + @Override + public WatermarkHoldState<W> readLater() { + // Ignore + return this; + } + + @Override + public String toString() { + return Objects.toString(minimumHold); + } + + @Override + public boolean shouldPersist() { + return minimumHold != null; + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (minimumHold != null) { + checkpointBuilder.addWatermarkHoldsBuilder() + .setTag(stateKey) + .setTimestamp(minimumHold); + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + Instant watermark = checkpointReader.getTimestamp(); + add(watermark); + } + } + + + private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext( + final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() { + @Override + public AccumT createAccumulator(K key, CombineWithContext.Context c) { + return combineFn.createAccumulator(key); + } + + @Override + public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) { + return combineFn.addInput(key, accumulator, value); + } + + @Override + public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) { + return combineFn.mergeAccumulators(key, accumulators); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) { + return combineFn.extractOutput(key, accumulator); + } + }; + } + + private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext( + final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() { + @Override + public AccumT createAccumulator(K key, CombineWithContext.Context c) { + return combineFn.createAccumulator(); + } + + @Override + public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) { + return combineFn.addInput(accumulator, value); + } + + @Override + public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) { + return combineFn.mergeAccumulators(accumulators); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) { + return combineFn.extractOutput(accumulator); + } + }; + } + + private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT> + implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF { + + private final ByteString stateKey; + private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn; + private final Coder<AccumT> accumCoder; + private final CombineWithContext.Context context; + + private AccumT accum = null; + private boolean isClear = true; + + private FlinkInMemoryKeyedCombiningValue(ByteString stateKey, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn, + Coder<AccumT> accumCoder, + final StateContext<?> stateContext) { + this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext); + } + + + private FlinkInMemoryKeyedCombiningValue(ByteString stateKey, + Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, + Coder<AccumT> accumCoder, + final StateContext<?> stateContext) { + this(stateKey, withContext(combineFn), accumCoder, stateContext); + } + + private FlinkInMemoryKeyedCombiningValue(ByteString stateKey, + CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn, + Coder<AccumT> accumCoder, + final StateContext<?> stateContext) { + Preconditions.checkNotNull(combineFn); + Preconditions.checkNotNull(accumCoder); + + this.stateKey = stateKey; + this.combineFn = combineFn; + this.accumCoder = accumCoder; + this.context = new CombineWithContext.Context() { + @Override + public PipelineOptions getPipelineOptions() { + return stateContext.getPipelineOptions(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return stateContext.sideInput(view); + } + }; + accum = combineFn.createAccumulator(key, context); + } + + @Override + public void clear() { + accum = combineFn.createAccumulator(key, context); + isClear = true; + } + + @Override + public void add(InputT input) { + isClear = false; + accum = combineFn.addInput(key, accum, input, context); + } + + @Override + public AccumT getAccum() { + return accum; + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + // Ignore + return this; + } + + @Override + public Boolean read() { + return isClear; + } + }; + } + + @Override + public void addAccum(AccumT accum) { + isClear = false; + this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context); + } + + @Override + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return combineFn.mergeAccumulators(key, accumulators, context); + } + + @Override + public OutputT read() { + return combineFn.extractOutput(key, accum, context); + } + + @Override + public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { + // Ignore + return this; + } + + @Override + public boolean shouldPersist() { + return !isClear; + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (!isClear) { + // serialize the coder. + byte[] coder = InstantiationUtil.serializeObject(accumCoder); + + // serialize the combiner. + byte[] combiner = InstantiationUtil.serializeObject(combineFn); + + // encode the accumulator into a ByteString + ByteString.Output stream = ByteString.newOutput(); + accumCoder.encode(accum, stream, Coder.Context.OUTER); + ByteString data = stream.toByteString(); + + // put the flag that the next serialized element is an accumulator + checkpointBuilder.addAccumulatorBuilder() + .setTag(stateKey) + .setData(coder) + .setData(combiner) + .setData(data); + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + ByteString valueContent = checkpointReader.getData(); + AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); + addAccum(accum); + } + } + + private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF { + private final List<T> contents = new ArrayList<>(); + + private final ByteString stateKey; + private final Coder<T> elemCoder; + + public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) { + this.stateKey = stateKey; + this.elemCoder = elemCoder; + } + + @Override + public void clear() { + contents.clear(); + } + + @Override + public Iterable<T> read() { + return contents; + } + + @Override + public BagState<T> readLater() { + // Ignore + return this; + } + + @Override + public void add(T input) { + contents.add(input); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + // Ignore + return this; + } + + @Override + public Boolean read() { + return contents.isEmpty(); + } + }; + } + + @Override + public boolean shouldPersist() { + return !contents.isEmpty(); + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (!contents.isEmpty()) { + // serialize the coder. + byte[] coder = InstantiationUtil.serializeObject(elemCoder); + + checkpointBuilder.addListUpdatesBuilder() + .setTag(stateKey) + .setData(coder) + .writeInt(contents.size()); + + for (T item : contents) { + // encode the element + ByteString.Output stream = ByteString.newOutput(); + elemCoder.encode(item, stream, Coder.Context.OUTER); + ByteString data = stream.toByteString(); + + // add the data to the checkpoint. + checkpointBuilder.setData(data); + } + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + int noOfValues = checkpointReader.getInt(); + for (int j = 0; j < noOfValues; j++) { + ByteString valueContent = checkpointReader.getData(); + T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); + add(outValue); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java new file mode 100644 index 0000000..d73ac8c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java @@ -0,0 +1,89 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import com.google.protobuf.ByteString; +import org.apache.flink.core.memory.DataInputView; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class StateCheckpointReader { + + private final DataInputView input; + + public StateCheckpointReader(DataInputView in) { + this.input = in; + } + + public ByteString getTag() throws IOException { + return ByteString.copyFrom(readRawData()); + } + + public String getTagToString() throws IOException { + return input.readUTF(); + } + + public ByteString getData() throws IOException { + return ByteString.copyFrom(readRawData()); + } + + public int getInt() throws IOException { + validate(); + return input.readInt(); + } + + public byte getByte() throws IOException { + validate(); + return input.readByte(); + } + + public Instant getTimestamp() throws IOException { + validate(); + Long watermarkMillis = input.readLong(); + return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis)); + } + + public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException { + return deserializeObject(keySerializer); + } + + public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException { + return objectSerializer.deserialize(input); + } + + ///////// Helper Methods /////// + + private byte[] readRawData() throws IOException { + validate(); + int size = input.readInt(); + + byte[] serData = new byte[size]; + int bytesRead = input.read(serData); + if (bytesRead != size) { + throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream."); + } + return serData; + } + + private void validate() { + if (this.input == null) { + throw new RuntimeException("StateBackend not initialized yet."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java new file mode 100644 index 0000000..055a12a --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java @@ -0,0 +1,153 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.state.StateNamespace; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class StateCheckpointUtils { + + public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals, + StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException { + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + + int noOfKeys = perKeyStateInternals.size(); + writer.writeInt(noOfKeys); + for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) { + K key = keyStatePair.getKey(); + FlinkStateInternals<K> state = keyStatePair.getValue(); + + // encode the key + writer.serializeKey(key, keySerializer); + + // write the associated state + state.persistState(writer); + } + } + + public static <K> Map<K, FlinkStateInternals<K>> decodeState( + StateCheckpointReader reader, + OutputTimeFn<? super BoundedWindow> outputTimeFn, + Coder<K> keyCoder, + Coder<? extends BoundedWindow> windowCoder, + ClassLoader classLoader) throws IOException, ClassNotFoundException { + + int noOfKeys = reader.getInt(); + Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys); + perKeyStateInternals.clear(); + + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + for (int i = 0; i < noOfKeys; i++) { + + // decode the key. + K key = reader.deserializeKey(keySerializer); + + //decode the state associated to the key. + FlinkStateInternals<K> stateForKey = + new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn); + stateForKey.restoreState(reader, classLoader); + perKeyStateInternals.put(key, stateForKey); + } + return perKeyStateInternals; + } + + ////////////// Encoding/Decoding the Timers //////////////// + + + public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers, + StateCheckpointWriter writer, + Coder<K> keyCoder) throws IOException { + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + + int noOfKeys = allTimers.size(); + writer.writeInt(noOfKeys); + for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) { + K key = timersPerKey.getKey(); + + // encode the key + writer.serializeKey(key, keySerializer); + + // write the associated timers + Set<TimerInternals.TimerData> timers = timersPerKey.getValue(); + encodeTimerDataForKey(writer, timers); + } + } + + public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers( + StateCheckpointReader reader, + Coder<? extends BoundedWindow> windowCoder, + Coder<K> keyCoder) throws IOException { + + int noOfKeys = reader.getInt(); + Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys); + activeTimers.clear(); + + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + for (int i = 0; i < noOfKeys; i++) { + + // decode the key. + K key = reader.deserializeKey(keySerializer); + + // decode the associated timers. + Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder); + activeTimers.put(key, timers); + } + return activeTimers; + } + + private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException { + // encode timers + writer.writeInt(timers.size()); + for (TimerInternals.TimerData timer : timers) { + String stringKey = timer.getNamespace().stringKey(); + + writer.setTag(stringKey); + writer.setTimestamp(timer.getTimestamp()); + writer.writeInt(timer.getDomain().ordinal()); + } + } + + private static Set<TimerInternals.TimerData> decodeTimerDataForKey( + StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException { + + // decode the timers: first their number and then the content itself. + int noOfTimers = reader.getInt(); + Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers); + for (int i = 0; i < noOfTimers; i++) { + String stringKey = reader.getTagToString(); + Instant instant = reader.getTimestamp(); + TimeDomain domain = TimeDomain.values()[reader.getInt()]; + + StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder); + timers.add(TimerInternals.TimerData.of(namespace, instant, domain)); + } + return timers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java new file mode 100644 index 0000000..738ce5f --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java @@ -0,0 +1,127 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import com.google.protobuf.ByteString; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class StateCheckpointWriter { + + private final AbstractStateBackend.CheckpointStateOutputView output; + + public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) { + return new StateCheckpointWriter(output); + } + + private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) { + this.output = output; + } + + ///////// Creating the serialized versions of the different types of state held by dataflow /////// + + public StateCheckpointWriter addValueBuilder() throws IOException { + validate(); + StateType.serialize(StateType.VALUE, this); + return this; + } + + public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException { + validate(); + StateType.serialize(StateType.WATERMARK, this); + return this; + } + + public StateCheckpointWriter addListUpdatesBuilder() throws IOException { + validate(); + StateType.serialize(StateType.LIST, this); + return this; + } + + public StateCheckpointWriter addAccumulatorBuilder() throws IOException { + validate(); + StateType.serialize(StateType.ACCUMULATOR, this); + return this; + } + + ///////// Setting the tag for a given state element /////// + + public StateCheckpointWriter setTag(ByteString stateKey) throws IOException { + return writeData(stateKey.toByteArray()); + } + + public StateCheckpointWriter setTag(String stateKey) throws IOException { + output.writeUTF(stateKey); + return this; + } + + + public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException { + return serializeObject(key, keySerializer); + } + + public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException { + objectSerializer.serialize(object, output); + return this; + } + + ///////// Write the actual serialized data ////////// + + public StateCheckpointWriter setData(ByteString data) throws IOException { + return writeData(data.toByteArray()); + } + + public StateCheckpointWriter setData(byte[] data) throws IOException { + return writeData(data); + } + + public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException { + validate(); + output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis())); + return this; + } + + public StateCheckpointWriter writeInt(int number) throws IOException { + validate(); + output.writeInt(number); + return this; + } + + public StateCheckpointWriter writeByte(byte b) throws IOException { + validate(); + output.writeByte(b); + return this; + } + + ///////// Helper Methods /////// + + private StateCheckpointWriter writeData(byte[] data) throws IOException { + validate(); + output.writeInt(data.length); + output.write(data); + return this; + } + + private void validate() { + if (this.output == null) { + throw new RuntimeException("StateBackend not initialized yet."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java new file mode 100644 index 0000000..8b20600 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java @@ -0,0 +1,71 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import java.io.IOException; + +/** + * The available types of state, as provided by the Beam SDK. This class is used for serialization/deserialization + * purposes. + * */ +public enum StateType { + + VALUE(0), + + WATERMARK(1), + + LIST(2), + + ACCUMULATOR(3); + + private final int numVal; + + StateType(int value) { + this.numVal = value; + } + + public static void serialize(StateType type, StateCheckpointWriter output) throws IOException { + if (output == null) { + throw new IllegalArgumentException("Cannot write to a null output."); + } + + if(type.numVal < 0 || type.numVal > 3) { + throw new RuntimeException("Unknown State Type " + type + "."); + } + + output.writeByte((byte) type.numVal); + } + + public static StateType deserialize(StateCheckpointReader input) throws IOException { + if (input == null) { + throw new IllegalArgumentException("Cannot read from a null input."); + } + + int typeInt = (int) input.getByte(); + if(typeInt < 0 || typeInt > 3) { + throw new RuntimeException("Unknown State Type " + typeInt + "."); + } + + StateType resultType = null; + for(StateType st: values()) { + if(st.numVal == typeInt) { + resultType = st; + break; + } + } + return resultType; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java deleted file mode 100644 index eaa5979..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.common.base.Joiner; -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class AvroITCase extends JavaProgramTestBase { - - protected String resultPath; - protected String tmpPath; - - public AvroITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "Joe red 3", - "Mary blue 4", - "Mark green 1", - "Julia purple 5" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - tmpPath = getTempDirPath("tmp"); - - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(tmpPath, resultPath); - } - - private static void runProgram(String tmpPath, String resultPath) { - Pipeline p = FlinkTestPipeline.createForBatch(); - - p - .apply(Create.of( - new User("Joe", 3, "red"), - new User("Mary", 4, "blue"), - new User("Mark", 1, "green"), - new User("Julia", 5, "purple")) - .withCoder(AvroCoder.of(User.class))) - - .apply(AvroIO.Write.to(tmpPath) - .withSchema(User.class)); - - p.run(); - - p = FlinkTestPipeline.createForBatch(); - - p - .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation()) - - .apply(ParDo.of(new DoFn<User, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - User u = c.element(); - String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber(); - c.output(result); - } - })) - - .apply(TextIO.Write.to(resultPath)); - - p.run(); - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java deleted file mode 100644 index 79eb163..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -public class FlattenizeITCase extends JavaProgramTestBase { - - private String resultPath; - private String resultPath2; - - private static final String[] words = {"hello", "this", "is", "a", "DataSet!"}; - private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"}; - private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - resultPath2 = getTempDirPath("result2"); - } - - @Override - protected void postSubmit() throws Exception { - String join = Joiner.on('\n').join(words); - String join2 = Joiner.on('\n').join(words2); - String join3 = Joiner.on('\n').join(words3); - compareResultsByLinesInMemory(join + "\n" + join2, resultPath); - compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2); - } - - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> p1 = p.apply(Create.of(words)); - PCollection<String> p2 = p.apply(Create.of(words2)); - - PCollectionList<String> list = PCollectionList.of(p1).and(p2); - - list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath)); - - PCollection<String> p3 = p.apply(Create.of(words3)); - - PCollectionList<String> list2 = list.and(p3); - - list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2)); - - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java deleted file mode 100644 index 2dcebde..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; - -/** - * {@link com.google.cloud.dataflow.sdk.Pipeline} for testing Dataflow programs on the - * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}. - */ -public class FlinkTestPipeline extends Pipeline { - - /** - * Creates and returns a new test pipeline for batch execution. - * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - */ - public static FlinkTestPipeline createForBatch() { - return create(false); - } - - /** - * Creates and returns a new test pipeline for streaming execution. - * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - * - * @return The Test Pipeline - */ - public static FlinkTestPipeline createForStreaming() { - return create(true); - } - - /** - * Creates and returns a new test pipeline for streaming or batch execution. - * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - * - * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. - * @return The Test Pipeline. - */ - private static FlinkTestPipeline create(boolean streaming) { - FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); - return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); - } - - private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, - PipelineOptions options) { - super(runner, options); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java deleted file mode 100644 index 11b6ce4..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.flink.util.JoinExamples; -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -/** - * Unfortunately we need to copy the code from the Dataflow SDK because it is not public there. - */ -public class JoinExamplesITCase extends JavaProgramTestBase { - - protected String resultPath; - - public JoinExamplesITCase(){ - } - - private static final TableRow row1 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com"); - private static final TableRow row2 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com"); - private static final TableRow row3 = new TableRow() - .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213") - .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com"); - static final TableRow[] EVENTS = new TableRow[] { - row1, row2, row3 - }; - static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS); - - private static final TableRow cc1 = new TableRow() - .set("FIPSCC", "VM").set("HumanName", "Vietnam"); - private static final TableRow cc2 = new TableRow() - .set("FIPSCC", "BE").set("HumanName", "Belgium"); - static final TableRow[] CCS = new TableRow[] { - cc1, cc2 - }; - static final List<TableRow> CC_ARRAY = Arrays.asList(CCS); - - static final String[] JOINED_EVENTS = new String[] { - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, " - + "url: http://www.chicagotribune.com", - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, " - + "url: http://cnn.com", - "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, " - + "url: http://cnn.com" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY)); - PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY)); - - PCollection<String> output = JoinExamples.joinEvents(input1, input2); - - output.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java deleted file mode 100644 index e39b81d..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.Serializable; - -public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable { - - protected String resultPath; - - protected final String expected = "test"; - - public MaybeEmptyTestITCase() { - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - p.apply(Create.of((Void) null)).setCoder(VoidCoder.of()) - .apply(ParDo.of( - new DoFn<Void, String>() { - @Override - public void processElement(DoFn<Void, String>.ProcessContext c) { - c.output(expected); - } - })).apply(TextIO.Write.to(resultPath)); - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java deleted file mode 100644 index 08e5323..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionTuple; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.cloud.dataflow.sdk.values.TupleTagList; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.Serializable; - -public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable { - - private String resultPath; - - private static String[] expectedWords = {"MAAA", "MAAFOOO"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO")); - - // Select words whose length is below a cut off, - // plus the lengths of words that are above the cut off. - // Also select words starting with "MARKER". - final int wordLengthCutOff = 3; - // Create tags to use for the main and side outputs. - final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; - final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; - final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; - - PCollectionTuple results = - words.apply(ParDo - .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag) - .and(markedWordsTag)) - .of(new DoFn<String, String>() { - final TupleTag<String> specialWordsTag = new TupleTag<String>() { - }; - - public void processElement(ProcessContext c) { - String word = c.element(); - if (word.length() <= wordLengthCutOff) { - c.output(word); - } else { - c.sideOutput(wordLengthsAboveCutOffTag, word.length()); - } - if (word.startsWith("MAA")) { - c.sideOutput(markedWordsTag, word); - } - - if (word.startsWith("SPECIAL")) { - c.sideOutput(specialWordsTag, word); - } - } - })); - - // Extract the PCollection results, by tag. - PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag); - PCollection<Integer> wordLengthsAboveCutOff = results.get - (wordLengthsAboveCutOffTag); - PCollection<String> markedWords = results.get(markedWordsTag); - - markedWords.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java deleted file mode 100644 index 7202417..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - - -public class ReadSourceITCase extends JavaProgramTestBase { - - protected String resultPath; - - public ReadSourceITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "1", "2", "3", "4", "5", "6", "7", "8", "9"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(resultPath); - } - - private static void runProgram(String resultPath) { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> result = p - .apply(Read.from(new ReadSource(1, 10))) - .apply(ParDo.of(new DoFn<Integer, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element().toString()); - } - })); - - result.apply(TextIO.Write.to(resultPath)); - p.run(); - } - - - private static class ReadSource extends BoundedSource<Integer> { - final int from; - final int to; - - ReadSource(int from, int to) { - this.from = from; - this.to = to; - } - - @Override - public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options) - throws Exception { - List<ReadSource> res = new ArrayList<>(); - FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); - int numWorkers = flinkOptions.getParallelism(); - Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0."); - - float step = 1.0f * (to - from) / numWorkers; - for (int i = 0; i < numWorkers; ++i) { - res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step))); - } - return res; - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return 8 * (to - from); - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return true; - } - - @Override - public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { - return new RangeReader(this); - } - - @Override - public void validate() {} - - @Override - public Coder<Integer> getDefaultOutputCoder() { - return BigEndianIntegerCoder.of(); - } - - private class RangeReader extends BoundedReader<Integer> { - private int current; - - public RangeReader(ReadSource source) { - this.current = source.from - 1; - } - - @Override - public boolean start() throws IOException { - return true; - } - - @Override - public boolean advance() throws IOException { - current++; - return (current < to); - } - - @Override - public Integer getCurrent() { - return current; - } - - @Override - public void close() throws IOException { - // Nothing - } - - @Override - public BoundedSource<Integer> getCurrentSource() { - return ReadSource.this; - } - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java deleted file mode 100644 index dc82d7d..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Collections; -import java.util.List; - - -public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase { - - protected String resultPath; - - public RemoveDuplicatesEmptyITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] {}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - List<String> strings = Collections.emptyList(); - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> input = - p.apply(Create.of(strings)) - .setCoder(StringUtf8Coder.of()); - - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); - - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java deleted file mode 100644 index 78b48b5..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -public class RemoveDuplicatesITCase extends JavaProgramTestBase { - - protected String resultPath; - - public RemoveDuplicatesITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "k1", "k5", "k2", "k3"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3"); - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> input = - p.apply(Create.of(strings)) - .setCoder(StringUtf8Coder.of()); - - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); - - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java deleted file mode 100644 index 5cd7d78..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.Serializable; - -public class SideInputITCase extends JavaProgramTestBase implements Serializable { - - private static final String expected = "Hello!"; - - protected String resultPath; - - @Override - protected void testProgram() throws Exception { - - - Pipeline p = FlinkTestPipeline.createForBatch(); - - - final PCollectionView<String> sidesInput = p - .apply(Create.of(expected)) - .apply(View.<String>asSingleton()); - - p.apply(Create.of("bli")) - .apply(ParDo.of(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - String s = c.sideInput(sidesInput); - c.output(s); - } - }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath)); - - p.run(); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java deleted file mode 100644 index ceb0a3f..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.flink.examples.TFIDF; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Keys; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.net.URI; - - -public class TfIdfITCase extends JavaProgramTestBase { - - protected String resultPath; - - public TfIdfITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "a", "m", "n", "b", "c", "d"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline pipeline = FlinkTestPipeline.createForBatch(); - - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline - .apply(Create.of( - KV.of(new URI("x"), "a b c d"), - KV.of(new URI("y"), "a b c"), - KV.of(new URI("z"), "a m n"))) - .apply(new TFIDF.ComputeTfIdf()); - - PCollection<String> words = wordToUriAndTfIdf - .apply(Keys.<String>create()) - .apply(RemoveDuplicates.<String>create()); - - words.apply(TextIO.Write.to(resultPath)); - - pipeline.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java deleted file mode 100644 index c2b6fdd..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.flink.examples.WordCount; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -public class WordCountITCase extends JavaProgramTestBase { - - protected String resultPath; - - public WordCountITCase(){ - } - - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - - input - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())) - .apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java deleted file mode 100644 index d78434b..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class WordCountJoin2ITCase extends JavaProgramTestBase { - - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1", - "bob -> Tag1: 2 Tag2: 1", - "hi -> Tag1: 5 Tag2: 3", - "hooray -> Tag1: Tag2: 1", - "please -> Tag1: Tag2: 1", - "say -> Tag1: Tag2: 1", - "sue -> Tag1: 2 Tag2: 1", - "there -> Tag1: 1 Tag2: 1", - "tim -> Tag1: Tag2: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count; - } - c.output(key + " -> " + countTag1 + countTag2); - } - } - - -}
