http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 43e458f..9cbc6b9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -17,43 +17,179 @@ */ package org.apache.beam.runners.flink.translation.functions; -import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PerKeyCombineFnRunner; +import org.apache.beam.sdk.util.PerKeyCombineFnRunners; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; -import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.List; +import java.util.Map; /** - * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a - * {@link org.apache.beam.sdk.transforms.Combine.PerKey} operation. This reads the input - * {@link org.apache.beam.sdk.values.KV} elements, extracts the key and merges the - * accumulators resulting from the PartialReduce which produced the input VA. + * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} + * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final + * combination of the pre-combined values after a shuffle. + * + * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but + * for different windows. We have to ensure that we only combine elements of matching + * windows. */ -public class FlinkReduceFunction<K, VA, VO> implements GroupReduceFunction<KV<K, VA>, KV<K, VO>> { +public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> + extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> { + + protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; + + protected final DoFn<KV<K, AccumT>, KV<K, OutputT>> doFn; + + protected final WindowingStrategy<?, W> windowingStrategy; + + protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + protected final SerializedPipelineOptions serializedOptions; - private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn; + public FlinkReduceFunction( + CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { - public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; + this.combineFn = keyedCombineFn; + + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + // dummy DoFn because we need one for ProcessContext + this.doFn = new DoFn<KV<K, AccumT>, KV<K, OutputT>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + + } + }; } @Override - public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) throws Exception { - Iterator<KV<K, VA>> it = values.iterator(); + public void reduce( + Iterable<WindowedValue<KV<K, AccumT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = + new FlinkProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + sideInputs); + + PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); - KV<K, VA> current = it.next(); - K k = current.getKey(); - VA accumulator = current.getValue(); + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - while (it.hasNext()) { - current = it.next(); - keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) ); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, AccumT>> inputValue: elements) { + for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { + @Override + public int compare( + WindowedValue<KV<K, AccumT>> o1, + WindowedValue<KV<K, AccumT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // iterate over the elements that are sorted by window timestamp + // + final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); + + // get the first accumulator + WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); + AccumT accumulator = currentValue.getValue().getValue(); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn, + // in FlinkPartialReduceFunction we already merge the timestamps assigned + // to individual elements, here we just merge them + List<Instant> windowTimestamps = new ArrayList<>(); + windowTimestamps.add(currentValue.getTimestamp()); + + while (iterator.hasNext()) { + WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); + BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); + + if (nextWindow.equals(currentWindow)) { + // continue accumulating + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.mergeAccumulators( + key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext); + + windowTimestamps.add(nextValue.getTimestamp()); + } else { + // emit the value that we currently have + processContext = processContext.forWindowedValue(currentValue); + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + + windowTimestamps.clear(); + + currentWindow = nextWindow; + accumulator = nextValue.getValue().getValue(); + windowTimestamps.add(nextValue.getTimestamp()); + } + + // we have to keep track so that we can set the context to the right + // windowed value when windows change in the iterable + currentValue = nextValue; } - out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator))); + // if at the end of the iteration we have a change in windows + // the ProcessContext will not have been updated + processContext = processContext.forWindowedValue(currentValue); + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java new file mode 100644 index 0000000..451b31b --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; + +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map} + * from window to side input. + */ +public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> + implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> { + + PCollectionView<ViewT> view; + + public SideInputInitializer(PCollectionView<ViewT> view) { + this.view = view; + } + + @Override + public Map<BoundedWindow, ViewT> initializeBroadcastVariable( + Iterable<WindowedValue<ElemT>> inputValues) { + + // first partition into windows + Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>(); + for (WindowedValue<ElemT> value: inputValues) { + for (BoundedWindow window: value.getWindows()) { + List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window); + if (windowedValues == null) { + windowedValues = new ArrayList<>(); + partitionedElements.put(window, windowedValues); + } + windowedValues.add(value); + } + } + + Map<BoundedWindow, ViewT> resultMap = new HashMap<>(); + + for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements: + partitionedElements.entrySet()) { + + @SuppressWarnings("unchecked") + Iterable<WindowedValue<?>> elementsIterable = + (List<WindowedValue<?>>) (List<?>) elements.getValue(); + + resultMap.put(elements.getKey(), view.fromIterableInternal(elementsIterable)); + } + + return resultMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java deleted file mode 100644 index cc6fd8b..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.VarInt; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -/** - * A UnionCoder encodes RawUnionValues. - * - * This file copied from {@link org.apache.beam.sdk.transforms.join.UnionCoder} - */ -@SuppressWarnings("serial") -public class UnionCoder extends StandardCoder<RawUnionValue> { - // TODO: Think about how to integrate this with a schema object (i.e. - // a tuple of tuple tags). - /** - * Builds a union coder with the given list of element coders. This list - * corresponds to a mapping of union tag to Coder. Union tags start at 0. - */ - public static UnionCoder of(List<Coder<?>> elementCoders) { - return new UnionCoder(elementCoders); - } - - @JsonCreator - public static UnionCoder jsonOf( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> elements) { - return UnionCoder.of(elements); - } - - private int getIndexForEncoding(RawUnionValue union) { - if (union == null) { - throw new IllegalArgumentException("cannot encode a null tagged union"); - } - int index = union.getUnionTag(); - if (index < 0 || index >= elementCoders.size()) { - throw new IllegalArgumentException( - "union value index " + index + " not in range [0.." + - (elementCoders.size() - 1) + "]"); - } - return index; - } - - @SuppressWarnings("unchecked") - @Override - public void encode( - RawUnionValue union, - OutputStream outStream, - Context context) - throws IOException { - int index = getIndexForEncoding(union); - // Write out the union tag. - VarInt.encode(index, outStream); - - // Write out the actual value. - Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - coder.encode( - union.getValue(), - outStream, - context); - } - - @Override - public RawUnionValue decode(InputStream inStream, Context context) - throws IOException { - int index = VarInt.decodeInt(inStream); - Object value = elementCoders.get(index).decode(inStream, context); - return new RawUnionValue(index, value); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public List<? extends Coder<?>> getComponents() { - return elementCoders; - } - - /** - * Since this coder uses elementCoders.get(index) and coders that are known to run in constant - * time, we defer the return value to that coder. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { - int index = getIndexForEncoding(union); - @SuppressWarnings("unchecked") - Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); - } - - /** - * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. - */ - @Override - public void registerByteSizeObserver( - RawUnionValue union, ElementByteSizeObserver observer, Context context) - throws Exception { - int index = getIndexForEncoding(union); - // Write out the union tag. - observer.update(VarInt.getLength(index)); - // Write out the actual value. - @SuppressWarnings("unchecked") - Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - coder.registerByteSizeObserver(union.getValue(), observer, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - private final List<Coder<?>> elementCoders; - - private UnionCoder(List<Coder<?>> elementCoders) { - this.elementCoders = elementCoders; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic( - "UnionCoder is only deterministic if all element coders are", - elementCoders); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java index 895ecef..4434cf8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java @@ -18,7 +18,8 @@ package org.apache.beam.runners.flink.translation.types; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.WindowedValue; import com.google.common.base.Preconditions; @@ -71,9 +72,6 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi @Override @SuppressWarnings("unchecked") public TypeSerializer<T> createSerializer(ExecutionConfig config) { - if (coder instanceof VoidCoder) { - return (TypeSerializer<T>) new VoidCoderTypeSerializer(); - } return new CoderTypeSerializer<>(coder); } @@ -84,8 +82,12 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CoderTypeInformation that = (CoderTypeInformation) o; @@ -113,6 +115,11 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi @Override public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { - return new CoderComparator<>(coder); + WindowedValue.WindowedValueCoder windowCoder = (WindowedValue.WindowedValueCoder) coder; + if (windowCoder.getValueCoder() instanceof KvCoder) { + return new KvCoderComperator(windowCoder); + } else { + return new CoderComparator<>(coder); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index c6f3921..097316b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -33,7 +33,7 @@ import java.io.ObjectInputStream; /** * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * Dataflow {@link org.apache.beam.sdk.coders.Coder}s + * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}. */ public class CoderTypeSerializer<T> extends TypeSerializer<T> { @@ -128,14 +128,20 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { } @Override - public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { + public void copy( + DataInputView dataInputView, + DataOutputView dataOutputView) throws IOException { serialize(deserialize(dataInputView), dataOutputView); } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CoderTypeSerializer that = (CoderTypeSerializer) o; return coder.equals(that.coder); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java index 6f0c651..79b127d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.types; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -31,14 +33,13 @@ import java.io.IOException; import java.io.ObjectInputStream; /** - * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for - * {@link org.apache.beam.sdk.coders.KvCoder}. We have a special comparator + * Flink {@link TypeComparator} for {@link KvCoder}. We have a special comparator * for {@link KV} that always compares on the key only. */ -public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { +public class KvCoderComperator <K, V> extends TypeComparator<WindowedValue<KV<K, V>>> { - private KvCoder<K, V> coder; - private Coder<K> keyCoder; + private final WindowedValue.WindowedValueCoder<KV<K, V>> coder; + private final Coder<K> keyCoder; // We use these for internal encoding/decoding for creating copies and comparing // serialized forms using a Coder @@ -52,9 +53,10 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { // For deserializing the key private transient DataInputViewWrapper inputWrapper; - public KvCoderComperator(KvCoder<K, V> coder) { + public KvCoderComperator(WindowedValue.WindowedValueCoder<KV<K, V>> coder) { this.coder = coder; - this.keyCoder = coder.getKeyCoder(); + KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder(); + this.keyCoder = kvCoder.getKeyCoder(); buffer1 = new InspectableByteArrayOutputStream(); buffer2 = new InspectableByteArrayOutputStream(); @@ -74,8 +76,8 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public int hash(KV<K, V> record) { - K key = record.getKey(); + public int hash(WindowedValue<KV<K, V>> record) { + K key = record.getValue().getKey(); if (key != null) { return key.hashCode(); } else { @@ -84,27 +86,27 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public void setReference(KV<K, V> toCompare) { + public void setReference(WindowedValue<KV<K, V>> toCompare) { referenceBuffer.reset(); try { - keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER); + keyCoder.encode(toCompare.getValue().getKey(), referenceBuffer, Coder.Context.OUTER); } catch (IOException e) { throw new RuntimeException("Could not set reference " + toCompare + ": " + e); } } @Override - public boolean equalToReference(KV<K, V> candidate) { + public boolean equalToReference(WindowedValue<KV<K, V>> candidate) { try { buffer2.reset(); - keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER); + keyCoder.encode(candidate.getValue().getKey(), buffer2, Coder.Context.OUTER); byte[] arr = referenceBuffer.getBuffer(); byte[] arrOther = buffer2.getBuffer(); if (referenceBuffer.size() != buffer2.size()) { return false; } int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { + for (int i = 0; i < len; i++) { if (arr[i] != arrOther[i]) { return false; } @@ -116,8 +118,9 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public int compareToReference(TypeComparator<KV<K, V>> other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer; + public int compareToReference(TypeComparator<WindowedValue<KV<K, V>>> other) { + InspectableByteArrayOutputStream otherReferenceBuffer = + ((KvCoderComperator<K, V>) other).referenceBuffer; byte[] arr = referenceBuffer.getBuffer(); byte[] arrOther = otherReferenceBuffer.getBuffer(); @@ -135,19 +138,19 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { @Override - public int compare(KV<K, V> first, KV<K, V> second) { + public int compare(WindowedValue<KV<K, V>> first, WindowedValue<KV<K, V>> second) { try { buffer1.reset(); buffer2.reset(); - keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER); - keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER); + keyCoder.encode(first.getValue().getKey(), buffer1, Coder.Context.OUTER); + keyCoder.encode(second.getValue().getKey(), buffer2, Coder.Context.OUTER); byte[] arr = buffer1.getBuffer(); byte[] arrOther = buffer2.getBuffer(); if (buffer1.size() != buffer2.size()) { return buffer1.size() - buffer2.size(); } int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { + for (int i = 0; i < len; i++) { if (arr[i] != arrOther[i]) { return arr[i] - arrOther[i]; } @@ -159,38 +162,19 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - + public int compareSerialized( + DataInputView firstSource, + DataInputView secondSource) throws IOException { inputWrapper.setInputView(firstSource); - K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); + WindowedValue<KV<K, V>> first = coder.decode(inputWrapper, Coder.Context.NESTED); inputWrapper.setInputView(secondSource); - K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER); - keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } + WindowedValue<KV<K, V>> second = coder.decode(inputWrapper, Coder.Context.NESTED); + return compare(first, second); } @Override public boolean supportsNormalizedKey() { - return true; + return false; } @Override @@ -209,12 +193,18 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) { + public void putNormalizedKey( + WindowedValue<KV<K, V>> record, + MemorySegment target, + int offset, + int numBytes) { + buffer1.reset(); try { - keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED); + keyCoder.encode(record.getValue().getKey(), buffer1, Coder.Context.NESTED); } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); + throw new RuntimeException( + "Could not serializer " + record + " using coder " + coder + ": " + e); } final byte[] data = buffer1.getBuffer(); final int limit = offset + numBytes; @@ -231,12 +221,16 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException { + public void writeWithKeyNormalization( + WindowedValue<KV<K, V>> record, + DataOutputView target) throws IOException { throw new UnsupportedOperationException(); } @Override - public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException { + public WindowedValue<KV<K, V>> readWithKeyDenormalization( + WindowedValue<KV<K, V>> reuse, + DataInputView source) throws IOException { throw new UnsupportedOperationException(); } @@ -246,14 +240,14 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { } @Override - public TypeComparator<KV<K, V>> duplicate() { + public TypeComparator<WindowedValue<KV<K, V>>> duplicate() { return new KvCoderComperator<>(coder); } @Override public int extractKeys(Object record, Object[] target, int index) { - KV<K, V> kv = (KV<K, V>) record; - K k = kv.getKey(); + WindowedValue<KV<K, V>> kv = (WindowedValue<KV<K, V>>) record; + K k = kv.getValue().getKey(); target[index] = k; return 1; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java index 74f3821..ba53f64 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink.translation.types; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import com.google.common.base.Preconditions; @@ -31,27 +32,32 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.List; /** - * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for - * Dataflow {@link org.apache.beam.sdk.coders.KvCoder}. + * Flink {@link TypeInformation} for {@link KvCoder}. This creates special comparator + * for {@link KV} that always compares on the key only. */ -public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { +public class KvCoderTypeInformation<K, V> extends CompositeType<WindowedValue<KV<K, V>>> { - private KvCoder<K, V> coder; + private final WindowedValue.WindowedValueCoder<KV<K, V>> coder; +// private KvCoder<K, V> coder; // We don't have the Class, so we have to pass null here. What a shame... - private static Object DUMMY = new Object(); + private static Object dummy = new Object(); @SuppressWarnings("unchecked") - public KvCoderTypeInformation(KvCoder<K, V> coder) { - super(((Class<KV<K,V>>) DUMMY.getClass())); + public KvCoderTypeInformation(WindowedValue.WindowedValueCoder<KV<K, V>> coder) { + super((Class) dummy.getClass()); this.coder = coder; Preconditions.checkNotNull(coder); } @Override @SuppressWarnings("unchecked") - public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { - return new KvCoderComperator((KvCoder) coder); + public TypeComparator<WindowedValue<KV<K, V>>> createComparator( + int[] logicalKeyFields, + boolean[] orders, + int logicalFieldOffset, + ExecutionConfig config) { + return new KvCoderComperator(coder); } @Override @@ -71,7 +77,7 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { @Override @SuppressWarnings("unchecked") - public Class<KV<K, V>> getTypeClass() { + public Class<WindowedValue<KV<K, V>>> getTypeClass() { return privateGetTypeClass(); } @@ -87,7 +93,7 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { @Override @SuppressWarnings("unchecked") - public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) { + public TypeSerializer<WindowedValue<KV<K, V>>> createSerializer(ExecutionConfig config) { return new CoderTypeSerializer<>(coder); } @@ -98,8 +104,12 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } KvCoderTypeInformation that = (KvCoderTypeInformation) o; @@ -122,10 +132,11 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { @Override @SuppressWarnings("unchecked") public <X> TypeInformation<X> getTypeAt(int pos) { + KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder(); if (pos == 0) { - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder()); + return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getKeyCoder()); } else if (pos == 1) { - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder()); + return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getValueCoder()); } else { throw new RuntimeException("Invalid field position " + pos); } @@ -134,11 +145,12 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { @Override @SuppressWarnings("unchecked") public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder(); switch (fieldExpression) { case "key": - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder()); + return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getKeyCoder()); case "value": - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder()); + return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getValueCoder()); default: throw new UnsupportedOperationException("Only KvCoder has fields."); } @@ -162,17 +174,24 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { } @Override - public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { - CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder()); + public void getFlatFields( + String fieldExpression, + int offset, + List<FlatFieldDescriptor> result) { + KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder(); + + CoderTypeInformation keyTypeInfo = + new CoderTypeInformation<>(kvCoder.getKeyCoder()); result.add(new FlatFieldDescriptor(0, keyTypeInfo)); } @Override - protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() { + protected TypeComparatorBuilder<WindowedValue<KV<K, V>>> createTypeComparatorBuilder() { return new KvCoderTypeComparatorBuilder(); } - private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> { + private class KvCoderTypeComparatorBuilder + implements TypeComparatorBuilder<WindowedValue<KV<K, V>>> { @Override public void initializeTypeComparatorBuilder(int size) {} @@ -181,7 +200,7 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { public void addComparatorField(int fieldId, TypeComparator<?> comparator) {} @Override - public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) { + public TypeComparator<WindowedValue<KV<K, V>>> createTypeComparator(ExecutionConfig config) { return new KvCoderComperator<>(coder); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java deleted file mode 100644 index 7b48208..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; - -/** - * Special Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * {@link org.apache.beam.sdk.coders.VoidCoder}. We need this because Flink does not - * allow returning {@code null} from an input reader. We return a {@link VoidValue} instead - * that behaves like a {@code null}, hopefully. - */ -public class VoidCoderTypeSerializer extends TypeSerializer<VoidCoderTypeSerializer.VoidValue> { - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public VoidCoderTypeSerializer duplicate() { - return this; - } - - @Override - public VoidValue createInstance() { - return VoidValue.INSTANCE; - } - - @Override - public VoidValue copy(VoidValue from) { - return from; - } - - @Override - public VoidValue copy(VoidValue from, VoidValue reuse) { - return from; - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(VoidValue record, DataOutputView target) throws IOException { - target.writeByte(1); - } - - @Override - public VoidValue deserialize(DataInputView source) throws IOException { - source.readByte(); - return VoidValue.INSTANCE; - } - - @Override - public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - source.readByte(); - target.writeByte(1); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof VoidCoderTypeSerializer) { - VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj; - return other.canEqual(this); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof VoidCoderTypeSerializer; - } - - @Override - public int hashCode() { - return 0; - } - - public static class VoidValue { - private VoidValue() {} - - public static VoidValue INSTANCE = new VoidValue(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java deleted file mode 100644 index e5567d3..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; - -import com.google.common.collect.Lists; - -import org.apache.flink.api.common.accumulators.Accumulator; - -import java.io.Serializable; - -/** - * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn} - * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the combine function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} - * operation. - */ -public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> { - - private AA aa; - private Combine.CombineFn<? super AI, AA, AR> combiner; - - public CombineFnAggregatorWrapper() { - } - - public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) { - this.combiner = combiner; - this.aa = combiner.createAccumulator(); - } - - @Override - public void add(AI value) { - combiner.addInput(aa, value); - } - - @Override - public Serializable getLocalValue() { - return (Serializable) combiner.extractOutput(aa); - } - - @Override - public void resetLocal() { - aa = combiner.createAccumulator(); - } - - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator<AI, Serializable> other) { - aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa)); - } - - @Override - public Accumulator<AI, Serializable> clone() { - // copy it by merging - AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa)); - CombineFnAggregatorWrapper<AI, AA, AR> result = new - CombineFnAggregatorWrapper<>(combiner); - result.aa = aaCopy; - return result; - } - - @Override - public void addValue(AI value) { - add(value); - } - - @Override - public String getName() { - return "CombineFn: " + combiner.toString(); - } - - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java index eb32fa2..82d3fb8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java @@ -33,20 +33,21 @@ import java.io.Serializable; * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} * operation. */ -public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> { +public class SerializableFnAggregatorWrapper<InputT, OutputT> + implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> { - private AO aa; - private Combine.CombineFn<AI, ?, AO> combiner; + private OutputT aa; + private Combine.CombineFn<InputT, ?, OutputT> combiner; - public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) { + public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) { this.combiner = combiner; resetLocal(); } - + @Override @SuppressWarnings("unchecked") - public void add(AI value) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, value)); + public void add(InputT value) { + this.aa = combiner.apply(ImmutableList.of((InputT) aa, value)); } @Override @@ -56,17 +57,17 @@ public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, A @Override public void resetLocal() { - this.aa = combiner.apply(ImmutableList.<AI>of()); + this.aa = combiner.apply(ImmutableList.<InputT>of()); } @Override @SuppressWarnings("unchecked") - public void merge(Accumulator<AI, Serializable> other) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue())); + public void merge(Accumulator<InputT, Serializable> other) { + this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue())); } @Override - public void addValue(AI value) { + public void addValue(InputT value) { add(value); } @@ -76,15 +77,15 @@ public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, A } @Override - public Combine.CombineFn<AI, ?, AO> getCombineFn() { + public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() { return combiner; } @Override - public Accumulator<AI, Serializable> clone() { + public Accumulator<InputT, Serializable> clone() { // copy it by merging - AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa)); - SerializableFnAggregatorWrapper<AI, AO> result = new + OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa)); + SerializableFnAggregatorWrapper<InputT, OutputT> result = new SerializableFnAggregatorWrapper<>(combiner); result.aa = resultCopy; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java index 53e544d..c0a7132 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; @@ -31,10 +32,11 @@ import java.io.IOException; import java.lang.reflect.Field; /** - * Wrapper class to use generic Write.Bound transforms as sinks. + * Wrapper for executing a {@link Sink} on Flink as an {@link OutputFormat}. + * * @param <T> The type of the incoming records. */ -public class SinkOutputFormat<T> implements OutputFormat<T> { +public class SinkOutputFormat<T> implements OutputFormat<WindowedValue<T>> { private final Sink<T> sink; @@ -75,9 +77,9 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { } @Override - public void writeRecord(T record) throws IOException { + public void writeRecord(WindowedValue<T> record) throws IOException { try { - writer.write(record); + writer.write(record.getValue()); } catch (Exception e) { throw new IOException("Couldn't write record.", e); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index debd1a1..1d06b1a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -21,12 +21,16 @@ import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,10 +39,10 @@ import java.util.List; /** - * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a - * Dataflow {@link org.apache.beam.sdk.io.Source}. + * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. */ -public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> { +public class SourceInputFormat<T> + implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> { private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); private final BoundedSource<T> initialSource; @@ -122,12 +126,16 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> } @Override - public T nextRecord(T t) throws IOException { + public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException { if (inputAvailable) { final T current = reader.getCurrent(); + final Instant timestamp = reader.getCurrentTimestamp(); // advance reader to have a record ready next time inputAvailable = reader.advance(); - return current; + return WindowedValue.of( + current, + timestamp, + GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); } return null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java index 3bf566b..6b69d54 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -54,7 +53,7 @@ public class FlinkGroupByKeyWrapper { @Override public K getKey(WindowedValue<KV<K, V>> value) throws Exception { - return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE : + return isKeyVoid ? (K) VoidValue.INSTANCE : value.getValue().getKey(); } @@ -64,4 +63,11 @@ public class FlinkGroupByKeyWrapper { } }); } + + // special type to return as key for null key + public static class VoidValue { + private VoidValue() {} + + public static VoidValue INSTANCE = new VoidValue(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java index d6aff7d..8cd8351 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; -import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -47,17 +46,11 @@ public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN @Override public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { - @SuppressWarnings("unchecked") - OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; for (byte[] element : elements) { ByteArrayInputStream bai = new ByteArrayInputStream(element); OUT outValue = coder.decode(bai, Coder.Context.OUTER); - if (outValue == null) { - out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } else { - out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } + out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } out.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java deleted file mode 100644 index 113fee0..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class AvroITCase extends JavaProgramTestBase { - - protected String resultPath; - protected String tmpPath; - - public AvroITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "Joe red 3", - "Mary blue 4", - "Mark green 1", - "Julia purple 5" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - tmpPath = getTempDirPath("tmp"); - - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(tmpPath, resultPath); - } - - private static void runProgram(String tmpPath, String resultPath) { - Pipeline p = FlinkTestPipeline.createForBatch(); - - p - .apply(Create.of( - new User("Joe", 3, "red"), - new User("Mary", 4, "blue"), - new User("Mark", 1, "green"), - new User("Julia", 5, "purple")) - .withCoder(AvroCoder.of(User.class))) - - .apply(AvroIO.Write.to(tmpPath) - .withSchema(User.class)); - - p.run(); - - p = FlinkTestPipeline.createForBatch(); - - p - .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation()) - - .apply(ParDo.of(new DoFn<User, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - User u = c.element(); - String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber(); - c.output(result); - } - })) - - .apply(TextIO.Write.to(resultPath)); - - p.run(); - } - - private static class User { - - private String name; - private int favoriteNumber; - private String favoriteColor; - - public User() {} - - public User(String name, int favoriteNumber, String favoriteColor) { - this.name = name; - this.favoriteNumber = favoriteNumber; - this.favoriteColor = favoriteColor; - } - - public String getName() { - return name; - } - - public String getFavoriteColor() { - return favoriteColor; - } - - public int getFavoriteNumber() { - return favoriteNumber; - } - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java deleted file mode 100644 index ac0a3d7..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -public class FlattenizeITCase extends JavaProgramTestBase { - - private String resultPath; - private String resultPath2; - - private static final String[] words = {"hello", "this", "is", "a", "DataSet!"}; - private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"}; - private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - resultPath2 = getTempDirPath("result2"); - } - - @Override - protected void postSubmit() throws Exception { - String join = Joiner.on('\n').join(words); - String join2 = Joiner.on('\n').join(words2); - String join3 = Joiner.on('\n').join(words3); - compareResultsByLinesInMemory(join + "\n" + join2, resultPath); - compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2); - } - - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> p1 = p.apply(Create.of(words)); - PCollection<String> p2 = p.apply(Create.of(words2)); - - PCollectionList<String> list = PCollectionList.of(p1).and(p2); - - list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath)); - - PCollection<String> p3 = p.apply(Create.of(words3)); - - PCollectionList<String> list2 = list.and(p3); - - list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2)); - - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java deleted file mode 100644 index 47685b6..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.flink.util.JoinExamples; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PCollection; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -/** - * Unfortunately we need to copy the code from the Dataflow SDK because it is not public there. - */ -public class JoinExamplesITCase extends JavaProgramTestBase { - - protected String resultPath; - - public JoinExamplesITCase(){ - } - - private static final TableRow row1 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com"); - private static final TableRow row2 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com"); - private static final TableRow row3 = new TableRow() - .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213") - .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com"); - static final TableRow[] EVENTS = new TableRow[] { - row1, row2, row3 - }; - static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS); - - private static final TableRow cc1 = new TableRow() - .set("FIPSCC", "VM").set("HumanName", "Vietnam"); - private static final TableRow cc2 = new TableRow() - .set("FIPSCC", "BE").set("HumanName", "Belgium"); - static final TableRow[] CCS = new TableRow[] { - cc1, cc2 - }; - static final List<TableRow> CC_ARRAY = Arrays.asList(CCS); - - static final String[] JOINED_EVENTS = new String[] { - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, " - + "url: http://www.chicagotribune.com", - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, " - + "url: http://cnn.com", - "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, " - + "url: http://cnn.com" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY)); - PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY)); - - PCollection<String> output = JoinExamples.joinEvents(input1, input2); - - output.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java deleted file mode 100644 index 4d66fa4..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.Serializable; - -public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable { - - protected String resultPath; - - protected final String expected = "test"; - - public MaybeEmptyTestITCase() { - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - p.apply(Create.of((Void) null)).setCoder(VoidCoder.of()) - .apply(ParDo.of( - new DoFn<Void, String>() { - @Override - public void processElement(DoFn<Void, String>.ProcessContext c) { - c.output(expected); - } - })).apply(TextIO.Write.to(resultPath)); - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java deleted file mode 100644 index a2ef4e2..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.Serializable; - -public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable { - - private String resultPath; - - private static String[] expectedWords = {"MAAA", "MAAFOOO"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO")); - - // Select words whose length is below a cut off, - // plus the lengths of words that are above the cut off. - // Also select words starting with "MARKER". - final int wordLengthCutOff = 3; - // Create tags to use for the main and side outputs. - final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; - final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; - final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; - - PCollectionTuple results = - words.apply(ParDo - .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag) - .and(markedWordsTag)) - .of(new DoFn<String, String>() { - final TupleTag<String> specialWordsTag = new TupleTag<String>() { - }; - - public void processElement(ProcessContext c) { - String word = c.element(); - if (word.length() <= wordLengthCutOff) { - c.output(word); - } else { - c.sideOutput(wordLengthsAboveCutOffTag, word.length()); - } - if (word.startsWith("MAA")) { - c.sideOutput(markedWordsTag, word); - } - - if (word.startsWith("SPECIAL")) { - c.sideOutput(specialWordsTag, word); - } - } - })); - - // Extract the PCollection results, by tag. - PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag); - PCollection<Integer> wordLengthsAboveCutOff = results.get - (wordLengthsAboveCutOffTag); - PCollection<String> markedWords = results.get(markedWordsTag); - - markedWords.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java index 66c959e..bb79b27 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -28,6 +28,9 @@ import com.google.common.base.Joiner; import org.apache.flink.test.util.JavaProgramTestBase; +import java.io.File; +import java.net.URI; + /** * Reads from a bounded source in batch execution. */ @@ -44,6 +47,13 @@ public class ReadSourceITCase extends JavaProgramTestBase { @Override protected void preSubmit() throws Exception { resultPath = getTempDirPath("result"); + + // need to create the dir, otherwise Beam sinks don't + // work for these tests + + if (!new File(new URI(resultPath)).mkdirs()) { + throw new RuntimeException("Could not create output dir."); + } } @Override @@ -56,7 +66,7 @@ public class ReadSourceITCase extends JavaProgramTestBase { runProgram(resultPath); } - private static void runProgram(String resultPath) { + private static void runProgram(String resultPath) throws Exception { Pipeline p = FlinkTestPipeline.createForBatch(); @@ -69,7 +79,7 @@ public class ReadSourceITCase extends JavaProgramTestBase { } })); - result.apply(TextIO.Write.to(resultPath)); + result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part")); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java deleted file mode 100644 index 471d326..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.RemoveDuplicates; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Collections; -import java.util.List; - - -public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase { - - protected String resultPath; - - public RemoveDuplicatesEmptyITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] {}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - List<String> strings = Collections.emptyList(); - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> input = - p.apply(Create.of(strings)) - .setCoder(StringUtf8Coder.of()); - - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); - - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java deleted file mode 100644 index 0544f20..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.RemoveDuplicates; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -public class RemoveDuplicatesITCase extends JavaProgramTestBase { - - protected String resultPath; - - public RemoveDuplicatesITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "k1", "k5", "k2", "k3"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3"); - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> input = - p.apply(Create.of(strings)) - .setCoder(StringUtf8Coder.of()); - - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); - - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java deleted file mode 100644 index 2c7c65e..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PCollectionView; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.Serializable; - -public class SideInputITCase extends JavaProgramTestBase implements Serializable { - - private static final String expected = "Hello!"; - - protected String resultPath; - - @Override - protected void testProgram() throws Exception { - - - Pipeline p = FlinkTestPipeline.createForBatch(); - - - final PCollectionView<String> sidesInput = p - .apply(Create.of(expected)) - .apply(View.<String>asSingleton()); - - p.apply(Create.of("bli")) - .apply(ParDo.of(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - String s = c.sideInput(sidesInput); - c.output(s); - } - }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath)); - - p.run(); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -}
