[
https://issues.apache.org/jira/browse/BEAM-5707?focusedWorklogId=155172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155172
]
ASF GitHub Bot logged work on BEAM-5707:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Oct/18 22:39
Start Date: 16/Oct/18 22:39
Worklog Time Spent: 10m
Work Description: pabloem closed pull request #6637: [BEAM-5707] Add a
periodic, streaming impulse source for Flink portable pipelines
URL: https://github.com/apache/beam/pull/6637
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 42b9c1114a7..2b276f404c7 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.flink;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
@@ -34,6 +37,7 @@
import java.util.TreeMap;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.NativeTransforms;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.RunnerPCollectionView;
@@ -52,6 +56,7 @@
import
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.StreamingImpulseSource;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -156,6 +161,9 @@ public StreamExecutionEnvironment getExecutionEnvironment()
{
void translate(String id, RunnerApi.Pipeline pipeline, T t);
}
+ private static final String STREAMING_IMPULSE_TRANSFORM_URN =
+ "flink:transform:streaming_impulse:v1";
+
private final Map<String, PTransformTranslator<StreamingTranslationContext>>
urnToTransformTranslator;
@@ -165,6 +173,7 @@ public StreamExecutionEnvironment getExecutionEnvironment()
{
translatorMap.put(PTransformTranslation.FLATTEN_TRANSFORM_URN,
this::translateFlatten);
translatorMap.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
this::translateGroupByKey);
translatorMap.put(PTransformTranslation.IMPULSE_TRANSFORM_URN,
this::translateImpulse);
+ translatorMap.put(STREAMING_IMPULSE_TRANSFORM_URN,
this::translateStreamingImpulse);
translatorMap.put(
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
this::translateAssignWindows);
translatorMap.put(ExecutableStage.URN, this::translateExecutableStage);
@@ -403,6 +412,40 @@ private void translateImpulse(
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
source);
}
+ /** Predicate to determine whether a URN is a Flink native transform. */
+ @AutoService(NativeTransforms.IsNativeTransform.class)
+ public static class IsFlinkNativeTransform implements
NativeTransforms.IsNativeTransform {
+ @Override
+ public boolean test(RunnerApi.PTransform pTransform) {
+ return STREAMING_IMPULSE_TRANSFORM_URN.equals(
+ PTransformTranslation.urnForTransformOrNull(pTransform));
+ }
+ }
+
+ private void translateStreamingImpulse(
+ String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext
context) {
+ RunnerApi.PTransform pTransform =
pipeline.getComponents().getTransformsOrThrow(id);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ int intervalMillis;
+ int messageCount;
+ try {
+ JsonNode config =
objectMapper.readTree(pTransform.getSpec().getPayload().toByteArray());
+ intervalMillis = config.path("interval_ms").asInt(100);
+ messageCount = config.path("message_count").asInt(0);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse configuration for streaming
impulse", e);
+ }
+
+ DataStreamSource<WindowedValue<byte[]>> source =
+ context
+ .getExecutionEnvironment()
+ .addSource(new StreamingImpulseSource(intervalMillis,
messageCount));
+
+
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
source);
+ }
+
private <T> void translateAssignWindows(
String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext
context) {
RunnerApi.Components components = pipeline.getComponents();
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/StreamingImpulseSource.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/StreamingImpulseSource.java
new file mode 100644
index 00000000000..b27a36924b0
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/StreamingImpulseSource.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.util.WindowedValue;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A streaming source that periodically produces an empty byte array. This is
mostly useful for
+ * debugging, or for triggering periodic behavior in a portable pipeline.
+ */
+public class StreamingImpulseSource extends
RichParallelSourceFunction<WindowedValue<byte[]>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingImpulseSource.class);
+
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
+ private long count = 0;
+ private final int intervalMillis;
+ private final int messageCount;
+
+ public StreamingImpulseSource(int intervalMillis, int messageCount) {
+ this.intervalMillis = intervalMillis;
+ this.messageCount = messageCount;
+ }
+
+ @Override
+ public void run(SourceContext<WindowedValue<byte[]>> ctx) {
+ // in order to produce messageCount messages across all parallel subtasks,
we divide by
+ // the total number of subtasks
+ int subtaskCount = messageCount /
getRuntimeContext().getNumberOfParallelSubtasks();
+ // if the message count is not evenly divisible by the number of subtasks,
add an estra
+ // message to the first (messageCount % subtasksCount) subtasks
+ if (getRuntimeContext().getIndexOfThisSubtask()
+ < (messageCount % getRuntimeContext().getNumberOfParallelSubtasks())) {
+ subtaskCount++;
+ }
+
+ while (!cancelled.get() && (messageCount == 0 || count < subtaskCount)) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(WindowedValue.valueInGlobalWindow(new byte[] {}));
+ count++;
+ }
+
+ try {
+ Thread.sleep(intervalMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while sleeping", e);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.cancelled.set(true);
+ }
+}
diff --git a/sdks/python/apache_beam/examples/flink/__init__.py
b/sdks/python/apache_beam/examples/flink/__init__.py
new file mode 100644
index 00000000000..6569e3fe5de
--- /dev/null
+++ b/sdks/python/apache_beam/examples/flink/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
new file mode 100644
index 00000000000..23ad8f25d4e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A streaming workflow that uses a synthetic streaming source.
+
+This can only be used with the Flink portable runner.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import apache_beam as beam
+import apache_beam.transforms.window as window
+from apache_beam.io.flink.flink_streaming_impulse_source import
FlinkStreamingImpulseSource
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterProcessingTime
+from apache_beam.transforms.trigger import Repeatedly
+
+
+def split(s):
+ a = s.split("-")
+ return a[0], int(a[1])
+
+
+def count(x):
+ return x[0], sum(x[1])
+
+
+def apply_timestamp(element):
+ import time
+ yield window.TimestampedValue(element, time.time())
+
+
+def run(argv=None):
+ """Build and run the pipeline."""
+ args = ["--runner=PortableRunner",
+ "--job_endpoint=localhost:8099",
+ "--streaming"]
+ if argv:
+ args.extend(argv)
+
+ parser = argparse.ArgumentParser()
+ _, pipeline_args = parser.parse_known_args(args)
+
+ pipeline_options = PipelineOptions(pipeline_args)
+
+ p = beam.Pipeline(options=pipeline_options)
+
+ messages = (p | FlinkStreamingImpulseSource()
+ .set_message_count(10000)
+ .set_interval_ms(500))
+
+ _ = (messages | 'decode' >> beam.Map(lambda x: ('', 1))
+ | 'window' >> beam.WindowInto(window.GlobalWindows(),
+ trigger=Repeatedly(
+ AfterProcessingTime(5 * 1000)),
+ accumulation_mode=
+ AccumulationMode.DISCARDING)
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(count)
+ | 'log' >> beam.Map(lambda x: logging.info("%d" % x[1])))
+
+ result = p.run()
+ result.wait_until_finish()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
diff --git a/sdks/python/apache_beam/io/flink/__init__.py
b/sdks/python/apache_beam/io/flink/__init__.py
new file mode 100644
index 00000000000..6569e3fe5de
--- /dev/null
+++ b/sdks/python/apache_beam/io/flink/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py
b/sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py
new file mode 100644
index 00000000000..5be728e2f89
--- /dev/null
+++ b/sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A PTransform that provides an unbounded, streaming source of empty byte arrays.
+
+This can only be used with the flink runner.
+"""
+from __future__ import absolute_import
+
+import json
+
+from apache_beam import PTransform
+from apache_beam import Windowing
+from apache_beam import pvalue
+from apache_beam.transforms.window import GlobalWindows
+
+
+class FlinkStreamingImpulseSource(PTransform):
+ URN = "flink:transform:streaming_impulse:v1"
+
+ config = {}
+
+ def expand(self, pbegin):
+ assert isinstance(pbegin, pvalue.PBegin), (
+ 'Input to transform must be a PBegin but found %s' % pbegin)
+ return pvalue.PCollection(pbegin.pipeline)
+
+ def get_windowing(self, inputs):
+ return Windowing(GlobalWindows())
+
+ def infer_output_type(self, unused_input_type):
+ return bytes
+
+ def to_runner_api_parameter(self, context):
+ assert isinstance(self, FlinkStreamingImpulseSource), \
+ "expected instance of StreamingImpulseSource, but got %s" %
self.__class__
+ return (self.URN, json.dumps(self.config))
+
+ def set_interval_ms(self, interval_ms):
+ """Sets the interval (in milliseconds) between messages in the stream.
+ """
+ self.config["interval_ms"] = interval_ms
+ return self
+
+ def set_message_count(self, message_count):
+ """If non-zero, the stream will produce only this many total messages.
+ Otherwise produces an unbounded number of messages.
+ """
+ self.config["message_count"] = message_count
+ return self
+
+ @staticmethod
+ @PTransform.register_urn("flink:transform:streaming_impulse:v1", None)
+ def from_runner_api_parameter(spec_parameter, _unused_context):
+ config = json.loads(spec_parameter)
+ instance = FlinkStreamingImpulseSource()
+ if "interval_ms" in config:
+ instance.set_interval_ms(config["interval_ms"])
+ if "message_count" in config:
+ instance.set_message_count(config["message_count"])
+
+ return instance
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 155172)
Time Spent: 5h 20m (was: 5h 10m)
> Add a portable Flink streaming synthetic source for testing
> -----------------------------------------------------------
>
> Key: BEAM-5707
> URL: https://issues.apache.org/jira/browse/BEAM-5707
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Micah Wylde
> Assignee: Aljoscha Krettek
> Priority: Minor
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> Currently there are no built-in streaming sources for portable pipelines.
> This makes it hard to test streaming functionality in the Python SDK.
> It would be very useful to add a periodic impulse source that (with some
> configurable frequency) outputs an empty byte array, which can then be
> transformed as desired inside the python pipeline. More context in this
> [mailing list
> discussion|https://lists.apache.org/thread.html/b44a648ab1d0cb200d8bfe4b280e9dad6368209c4725609cbfbbe410@%3Cdev.beam.apache.org%3E].
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)