Repository: beam Updated Branches: refs/heads/master ef56ea495 -> 1866a0113
[BEAM-2401] Update Flink Runner to Flink 1.3.0 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fbc6cc59 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fbc6cc59 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fbc6cc59 Branch: refs/heads/master Commit: fbc6cc59bff93dfcf8676f874870a43eeb228c15 Parents: ef56ea4 Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Fri Jun 2 11:31:44 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sun Jun 4 08:18:20 2017 +0200 ---------------------------------------------------------------------- runners/flink/pom.xml | 2 +- .../translation/types/CoderTypeSerializer.java | 55 ++++++++++++++++++++ .../types/EncodedValueSerializer.java | 18 +------ .../state/FlinkBroadcastStateInternals.java | 29 +++++------ .../streaming/UnboundedSourceWrapperTest.java | 37 +++++++++++++ 5 files changed, 109 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index fb0a67c..92f95a0 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -31,7 +31,7 @@ <packaging>jar</packaging> <properties> - <flink.version>1.2.1</flink.version> + <flink.version>1.3.0</flink.version> </properties> <profiles> http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index e003119..bea562e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -24,7 +24,9 @@ import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -129,4 +131,57 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { public int hashCode() { return coder.hashCode(); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new CoderTypeSerializerConfigSnapshot<>(coder); + } + + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof CoderTypeSerializerConfigSnapshot) { + if (coder.equals(((CoderTypeSerializerConfigSnapshot<?>) configSnapshot).coder)) { + return CompatibilityResult.compatible(); + } + } + return CompatibilityResult.requiresMigration(); + } + + /** + * TypeSerializerConfigSnapshot of CoderTypeSerializer. + */ + public static class CoderTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + private Coder<T> coder; + + public CoderTypeSerializerConfigSnapshot(Coder<T> coder) { + this.coder = coder; + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CoderTypeSerializerConfigSnapshot<?> that = (CoderTypeSerializerConfigSnapshot<?>) o; + + return coder != null ? coder.equals(that.coder) : that.coder == null; + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java index c3b9794..c40eb46 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java @@ -20,13 +20,14 @@ package org.apache.beam.runners.flink.translation.types; import java.io.IOException; import org.apache.beam.sdk.coders.Coder; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; /** * {@link TypeSerializer} for values that were encoded using a {@link Coder}. */ -public final class EncodedValueSerializer extends TypeSerializer<byte[]> { +public final class EncodedValueSerializer extends TypeSerializerSingleton<byte[]> { private static final long serialVersionUID = 1L; @@ -57,7 +58,6 @@ public final class EncodedValueSerializer extends TypeSerializer<byte[]> { return -1; } - @Override public void serialize(byte[] record, DataOutputView target) throws IOException { if (record == null) { @@ -94,18 +94,4 @@ public final class EncodedValueSerializer extends TypeSerializer<byte[]> { return obj instanceof EncodedValueSerializer; } - @Override - public int hashCode() { - return this.getClass().hashCode(); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof EncodedValueSerializer; - } - - @Override - public TypeSerializer<byte[]> duplicate() { - return this; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index f44e668..6cc2429 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -49,11 +49,11 @@ import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.runtime.state.OperatorStateBackend; /** - * {@link StateInternals} that uses a Flink {@link DefaultOperatorStateBackend} + * {@link StateInternals} that uses a Flink {@link OperatorStateBackend} * to manage the broadcast state. * The state is the same on all parallel instances of the operator. * So we just need store state of operator-0 in OperatorStateBackend. @@ -64,13 +64,12 @@ import org.apache.flink.runtime.state.OperatorStateBackend; public class FlinkBroadcastStateInternals<K> implements StateInternals { private int indexInSubtaskGroup; - private final DefaultOperatorStateBackend stateBackend; + private final OperatorStateBackend stateBackend; // stateName -> <namespace, state> private Map<String, Map<String, ?>> stateForNonZeroOperator; public FlinkBroadcastStateInternals(int indexInSubtaskGroup, OperatorStateBackend stateBackend) { - //TODO flink do not yet expose through public API - this.stateBackend = (DefaultOperatorStateBackend) stateBackend; + this.stateBackend = stateBackend; this.indexInSubtaskGroup = indexInSubtaskGroup; if (indexInSubtaskGroup != 0) { stateForNonZeroOperator = new HashMap<>(); @@ -178,10 +177,10 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { private String name; private final StateNamespace namespace; private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor; - private final DefaultOperatorStateBackend flinkStateBackend; + private final OperatorStateStore flinkStateBackend; AbstractBroadcastState( - DefaultOperatorStateBackend flinkStateBackend, + OperatorStateBackend flinkStateBackend, String name, StateNamespace namespace, Coder<T> coder) { @@ -211,7 +210,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { if (result != null) { stateForNonZeroOperator.put(name, result); // we don't need it anymore, must clear it. - flinkStateBackend.getBroadcastOperatorState( + flinkStateBackend.getUnionListState( flinkStateDescriptor).clear(); } } @@ -220,7 +219,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { } Map<String, T> getMapFromBroadcastState() throws Exception { - ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState( + ListState<Map<String, T>> state = flinkStateBackend.getUnionListState( flinkStateDescriptor); Iterable<Map<String, T>> iterable = state.get(); Map<String, T> ret = null; @@ -239,7 +238,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { */ void updateMap(Map<String, T> map) throws Exception { if (indexInSubtaskGroup == 0) { - ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState( + ListState<Map<String, T>> state = flinkStateBackend.getUnionListState( flinkStateDescriptor); state.clear(); if (map.size() > 0) { @@ -304,7 +303,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { private final StateTag<ValueState<T>> address; FlinkBroadcastValueState( - DefaultOperatorStateBackend flinkStateBackend, + OperatorStateBackend flinkStateBackend, StateTag<ValueState<T>> address, StateNamespace namespace, Coder<T> coder) { @@ -365,7 +364,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { private final StateTag<BagState<T>> address; FlinkBroadcastBagState( - DefaultOperatorStateBackend flinkStateBackend, + OperatorStateBackend flinkStateBackend, StateTag<BagState<T>> address, StateNamespace namespace, Coder<T> coder) { @@ -454,7 +453,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; FlinkCombiningState( - DefaultOperatorStateBackend flinkStateBackend, + OperatorStateBackend flinkStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, @@ -572,7 +571,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { private final FlinkBroadcastStateInternals<K> flinkStateInternals; FlinkKeyedCombiningState( - DefaultOperatorStateBackend flinkStateBackend, + OperatorStateBackend flinkStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, @@ -709,7 +708,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { private final CombineWithContext.Context context; FlinkCombiningStateWithContext( - DefaultOperatorStateBackend flinkStateBackend, + OperatorStateBackend flinkStateBackend, StateTag<CombiningState<InputT, AccumT, OutputT>> address, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 500fa66..e3875bc 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -55,9 +55,12 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.OutputTag; import org.junit.Test; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; @@ -127,6 +130,7 @@ public class UnboundedSourceWrapperTest { try { sourceOperator.open(); sourceOperator.run(checkpointLock, + new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { private int count = 0; @@ -135,6 +139,11 @@ public class UnboundedSourceWrapperTest { } @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) { + collect((StreamRecord) streamRecord); + } + + @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { } @@ -215,6 +224,7 @@ public class UnboundedSourceWrapperTest { try { sourceOperator.open(); sourceOperator.run(checkpointLock, + new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { private int count = 0; @@ -223,6 +233,11 @@ public class UnboundedSourceWrapperTest { } @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) { + collect((StreamRecord) streamRecord); + } + + @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { } @@ -293,6 +308,7 @@ public class UnboundedSourceWrapperTest { try { restoredSourceOperator.open(); restoredSourceOperator.run(checkpointLock, + new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { private int count = 0; @@ -301,6 +317,11 @@ public class UnboundedSourceWrapperTest { } @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) { + collect((StreamRecord) streamRecord); + } + + @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { } @@ -462,4 +483,20 @@ public class UnboundedSourceWrapperTest { } + private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer { + StreamStatus currentStreamStatus = StreamStatus.ACTIVE; + + @Override + public void toggleStreamStatus(StreamStatus streamStatus) { + if (!currentStreamStatus.equals(streamStatus)) { + currentStreamStatus = streamStatus; + } + } + + @Override + public StreamStatus getStreamStatus() { + return currentStreamStatus; + } + } + }