[FLINK-9316][streaming] Expose operator's unique ID in DataStream programs This allows to uniquely and stably across multiple job submissions identify operators. Previously two different operators that were executed by tasks that had the same name were indistinguishable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fd694db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fd694db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fd694db Branch: refs/heads/master Commit: 3fd694db6f502e5df12476246dce05b1d1fc27bf Parents: 67eac44 Author: Piotr Nowojski <[email protected]> Authored: Tue May 8 17:46:29 2018 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:42:30 2018 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaConsumerBaseTest.java | 6 ++ .../kinesis/testutils/TestRuntimeContext.java | 6 ++ flink-contrib/flink-storm/pom.xml | 8 +++ .../flink/storm/wrappers/BoltWrapperTest.java | 18 ++--- .../api/operators/StreamingRuntimeContext.java | 15 ++++ .../source/InputFormatSourceFunctionTest.java | 6 ++ .../api/operators/GetOperatorUniqueIDTest.java | 75 ++++++++++++++++++++ .../operators/StreamingRuntimeContextTest.java | 4 ++ 8 files changed, 129 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 4605015..c9b5241 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -873,6 +874,11 @@ public class FlinkKafkaConsumerBaseTest { public ExecutionConfig getExecutionConfig() { return new ExecutionConfig(); } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java index 740d2f2..9a3ad72 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -83,5 +84,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext { public ExecutionConfig getExecutionConfig() { return new ExecutionConfig(); } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(42, 44); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-contrib/flink-storm/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index 496aecd..fb52a93 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -180,6 +180,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index 430e4d8..d405a45 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -32,12 +32,12 @@ import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; import org.apache.flink.storm.util.TestDummyBolt; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -159,7 +159,7 @@ public class BoltWrapperTest extends AbstractTest { PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class)); wrapper.open(); wrapper.processElement(record); @@ -195,7 +195,7 @@ public class BoltWrapperTest extends AbstractTest { } final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), output); wrapper.open(); final SplitStreamType splitRecord = new SplitStreamType<Integer>(); @@ -248,7 +248,7 @@ public class BoltWrapperTest extends AbstractTest { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class)); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); @@ -261,7 +261,7 @@ public class BoltWrapperTest extends AbstractTest { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class)); wrapper.open(); verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class)); @@ -278,7 +278,7 @@ public class BoltWrapperTest extends AbstractTest { TestDummyBolt testBolt = new TestDummyBolt(); BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(testBolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class)); wrapper.open(); for (Entry<String, String> entry : cfg.toMap().entrySet()) { @@ -305,7 +305,7 @@ public class BoltWrapperTest extends AbstractTest { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class)); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); @@ -322,7 +322,7 @@ public class BoltWrapperTest extends AbstractTest { final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class)); wrapper.close(); wrapper.dispose(); @@ -379,7 +379,7 @@ public class BoltWrapperTest extends AbstractTest { final CloseableRegistry closeableRegistry = new CloseableRegistry(); StreamTask<?, ?> mockTask = mock(StreamTask.class); when(mockTask.getCheckpointLock()).thenReturn(new Object()); - when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration())); + when(mockTask.getConfiguration()).thenReturn(new MockStreamConfig()); when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(execConfig); when(mockTask.getCancelables()).thenReturn(closeableRegistry); http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 1f42ccf..89c038f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -61,6 +61,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final StreamConfig streamConfig; + private final String operatorUniqueID; + public StreamingRuntimeContext(AbstractStreamOperator<?> operator, Environment env, Map<String, Accumulator<?, ?>> accumulators) { super(env.getTaskInfo(), @@ -73,6 +75,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { this.operator = operator; this.taskEnvironment = env; this.streamConfig = new StreamConfig(env.getTaskConfiguration()); + this.operatorUniqueID = operator.getOperatorID().toString(); } // ------------------------------------------------------------------------ @@ -90,6 +93,18 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return operator.getProcessingTimeService(); } + /** + * Returned value is guaranteed to be unique between operators within the same job and to be + * stable and the same across job submissions. + * + * <p>This operation is currently only supported in Streaming (DataStream) contexts. + * + * @return String representation of the operator's unique id. + */ + public String getOperatorUniqueID() { + return operatorUniqueID; + } + // ------------------------------------------------------------------------ // broadcast variables // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index 84a45d8..cad3df8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -29,6 +29,7 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -308,6 +309,11 @@ public class InputFormatSourceFunctionTest { public ExecutionConfig getExecutionConfig() { return new ExecutionConfig(); } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java new file mode 100644 index 0000000..9693e42 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; + +/** + * Tests the uid translation to {@link org.apache.flink.runtime.jobgraph.OperatorID}. + */ +@SuppressWarnings("serial") +public class GetOperatorUniqueIDTest extends TestLogger { + + /** + * If expected values ever change double check that the change is not braking the contract of + * {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions. + */ + @Test + public void testGetOperatorUniqueID() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + env.fromElements(1, 2, 3) + .map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42") + .map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44"); + + env.execute(); + } + + private static class VerifyOperatorIDMapFunction extends AbstractRichFunction implements MapFunction<Integer, Integer> { + private static final long serialVersionUID = 6584823409744624276L; + + private final String expectedOperatorUniqueID; + + public VerifyOperatorIDMapFunction(String expectedOperatorUniqueID) { + this.expectedOperatorUniqueID = checkNotNull(expectedOperatorUniqueID); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + assertEquals(expectedOperatorUniqueID, ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID()); + } + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 87667b2..e04cedd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -41,6 +41,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.query.KvStateRegistry; @@ -296,6 +297,7 @@ public class StreamingRuntimeContextTest { }).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(StateDescriptor.class)); when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); + when(operatorMock.getOperatorID()).thenReturn(new OperatorID()); return operatorMock; } @@ -333,6 +335,7 @@ public class StreamingRuntimeContextTest { }).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(ListStateDescriptor.class)); when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); + when(operatorMock.getOperatorID()).thenReturn(new OperatorID()); return operatorMock; } @@ -369,6 +372,7 @@ public class StreamingRuntimeContextTest { }).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(MapStateDescriptor.class)); when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); + when(operatorMock.getOperatorID()).thenReturn(new OperatorID()); return operatorMock; }
