[
https://issues.apache.org/jira/browse/BEAM-3979?focusedWorklogId=108974&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108974
]
ASF GitHub Bot logged work on BEAM-3979:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Jun/18 08:23
Start Date: 05/Jun/18 08:23
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #5331: [BEAM-3979]
Reintroduce usage from pr/4989
URL: https://github.com/apache/beam/pull/5331
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 8e6a66f61f8..8064d32c41f 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -106,7 +106,7 @@
}
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(@Element String element, OutputReceiver<String>
receiver) {
Instant randomTimestamp =
new Instant(
ThreadLocalRandom.current()
@@ -115,7 +115,7 @@ public void processElement(ProcessContext c) {
/**
* Concept #2: Set the data element with that timestamp.
*/
- c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
+ receiver.outputWithTimestamp(element, new Instant(randomTimestamp));
}
}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index dbbb79de468..ddcef2bf5c6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -93,19 +93,19 @@
ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
- public void processElement(ProcessContext c) {
- lineLenDist.update(c.element().length());
- if (c.element().trim().isEmpty()) {
+ public void processElement(@Element String element, OutputReceiver<String>
receiver) {
+ lineLenDist.update(element.length());
+ if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
- String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN, -1);
+ String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
- c.output(word);
+ receiver.output(word);
}
}
}
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index 48a14dea8eb..18f9ea7b054 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -59,13 +59,14 @@ public void pipelineExecution() throws Exception {
ParDo.of(
new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
- public void process(ProcessContext ctxt) {
- for (int i = 0; i < ctxt.element(); i++) {
- ctxt.outputWithTimestamp(
- KV.of("foo", ctxt.element()),
+ public void process(@Element Integer e,
+ MultiOutputReceiver r) {
+ for (int i = 0; i < e; i++) {
+ r.get(food).outputWithTimestamp(
+ KV.of("foo", e),
new
Instant(0).plus(Duration.standardHours(i)));
}
- ctxt.output(originals, ctxt.element());
+ r.get(originals).output(e);
}
})
.withOutputTags(food, TupleTagList.of(originals)));
@@ -79,11 +80,10 @@ public void process(ProcessContext ctxt) {
ParDo.of(
new DoFn<KV<String, Iterable<Integer>>, KV<String,
Set<Integer>>>() {
@ProcessElement
- public void process(ProcessContext ctxt) {
- ctxt.output(
- KV.of(
- ctxt.element().getKey(),
-
ImmutableSet.copyOf(ctxt.element().getValue())));
+ public void process(@Element KV<String,
Iterable<Integer>> e,
+ OutputReceiver<KV<String,
Set<Integer>>> r) {
+ r.output(
+ KV.of(e.getKey(),
ImmutableSet.copyOf(e.getValue())));
}
}));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6278d312d46..5ad685a5b9e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1859,14 +1859,14 @@ public void startBundle() {
}
@ProcessElement
- public void processElement(ProcessContext c) {
- KV<K, InputT> kv = c.element();
+ public void processElement(@Element KV<K, InputT> kv,
+ MultiOutputReceiver receiver) {
int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
if (spread <= 1) {
- c.output(kv);
+ receiver.get(cold).output(kv);
} else {
int nonce = counter++ % spread;
- c.output(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue()));
+ receiver.get(hot).output(KV.of(KV.of(kv.getKey(), nonce),
kv.getValue()));
}
}
})
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index b8708900849..ebed95c43d2 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -81,12 +81,12 @@
* .apply(ParDo.of(
* new DoFn<CoCombineResult, T>() {
* {@literal @}ProcessElement
- * public void processElement(ProcessContext c) throws Exception {
- * CoCombineResult e = c.element();
+ * public void processElement(
+ * {@literal @}Element CoCombineResult e, OutputReceiver<T> r)
throws Exception {
* Integer maxLatency = e.get(maxLatencyTag);
* Double meanLatency = e.get(meanLatencyTag);
* .... Do Something ....
- * c.output(...some T...);
+ * r.output(...some T...);
* }
* }));
* }</pre>
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index afb84a2e9c3..038c6c18b11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -603,8 +603,8 @@ private TimestampedValues(
private static class ConvertTimestamps<T> extends
DoFn<TimestampedValue<T>, T> {
@ProcessElement
- public void processElement(ProcessContext c) {
- c.outputWithTimestamp(c.element().getValue(),
c.element().getTimestamp());
+ public void processElement(@Element TimestampedValue<T> element,
OutputReceiver<T> r) {
+ r.outputWithTimestamp(element.getValue(), element.getTimestamp());
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
index c5190555898..50c7f787ab4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -22,6 +22,7 @@
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -123,10 +124,11 @@ public Void apply(Iterable<Void> iter) {
ParDo.of(
new DoFn<KV<T, Void>, T>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- if (c.pane().isFirst()) {
+ public void processElement(@Element KV<T, Void> element,
PaneInfo pane,
+ OutputReceiver<T> receiver) {
+ if (pane.isFirst()) {
// Only output the key if it's the first time it's been seen.
- c.output(c.element().getKey());
+ receiver.output(element.getKey());
}
}
}));
@@ -186,10 +188,11 @@ public T apply(T left, T right) {
ParDo.of(
new DoFn<KV<IdT, T>, T>() {
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(@Element KV<IdT, T> element,
PaneInfo pane,
+ OutputReceiver<T> receiver) {
// Only output the value if it's the first time it's been
seen.
- if (c.pane().isFirst()) {
- c.output(c.element().getValue());
+ if (pane.isFirst()) {
+ receiver.output(element.getValue());
}
}
}));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index e27c2dd9ed9..98af74551c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -212,9 +212,9 @@ private Filter(SerializableFunction<T, Boolean> predicate,
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- if (predicate.apply(c.element())) {
- c.output(c.element());
+ public void processElement(@Element T element,
OutputReceiver<T> r) {
+ if (predicate.apply(element)) {
+ r.output(element);
}
}
}))
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 07a670ca13f..2f7a343598a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -60,8 +60,9 @@
* .setCoder(KvCoder.of(StringUtf8Coder.of(),
IterableCoder.of(StringUtf8Coder.of())))
* .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String,
String>>() {
* {@literal @}ProcessElement
- * public void processElement(ProcessContext c) {
- * c.output(KV.of(c.element().getKey(),
callWebService(c.element().getValue())));
+ * public void processElement({@literal @}Element KV<String, Iterable<String>>
element,
+ * OutputReceiver<KV<String, String>> r) {
+ * r.output(KV.of(element.getKey(), callWebService(element.getValue())));
* }
* }));
* pipeline.run();
@@ -156,16 +157,17 @@ public void processElement(
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
CombiningState<Long, long[], Long> numElementsInBatch,
@StateId(KEY_ID) ValueState<K> key,
- ProcessContext c,
- BoundedWindow window) {
+ @Element KV<K, InputT> element,
+ BoundedWindow window,
+ OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
LOG.debug(
"*** SET TIMER *** to point in time {} for window {}",
windowExpires.toString(), window.toString());
timer.set(windowExpires);
- key.write(c.element().getKey());
- batch.add(c.element().getValue());
+ key.write(element.getKey());
+ batch.add(element.getValue());
LOG.debug("*** BATCH *** Add element for window {} ", window.toString());
// blind add is supported with combiningState
numElementsInBatch.add(1L);
@@ -176,13 +178,14 @@ public void processElement(
}
if (num >= batchSize) {
LOG.debug("*** END OF BATCH *** for window {}", window.toString());
- flushBatch(c, key, batch, numElementsInBatch);
+ flushBatch(receiver, key, batch, numElementsInBatch);
}
}
@OnTimer(END_OF_WINDOW_ID)
public void onTimerCallback(
- OnTimerContext context,
+ OutputReceiver<KV<K, Iterable<InputT>>> receiver,
+ @Timestamp Instant timestamp,
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
@@ -190,19 +193,19 @@ public void onTimerCallback(
BoundedWindow window) {
LOG.debug(
"*** END OF WINDOW *** for timer timestamp {} in windows {}",
- context.timestamp(), window.toString());
- flushBatch(context, key, batch, numElementsInBatch);
+ timestamp, window.toString());
+ flushBatch(receiver, key, batch, numElementsInBatch);
}
private void flushBatch(
- WindowedContext c,
+ OutputReceiver<KV<K, Iterable<InputT>>> receiver,
ValueState<K> key,
BagState<InputT> batch,
CombiningState<Long, long[], Long> numElementsInBatch) {
Iterable<InputT> values = batch.read();
// when the timer fires, batch state might be empty
if (!Iterables.isEmpty(values)) {
- c.output(KV.of(key.read(), values));
+ receiver.output(KV.of(key.read(), values));
}
batch.clear();
LOG.debug("*** BATCH *** clear");
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index a141cf11c1c..254d25460c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -31,6 +31,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
/**
* {@link PTransform} and {@link Combine.CombineFn} for computing the latest
element
@@ -167,8 +168,9 @@ public T extractOutput(TimestampedValue<T> accumulator) {
ParDo.of(
new DoFn<T, TimestampedValue<T>>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- c.output(TimestampedValue.of(c.element(),
c.timestamp()));
+ public void processElement(@Element T element, @Timestamp
Instant timestamp,
+
OutputReceiver<TimestampedValue<T>> r) {
+ r.output(TimestampedValue.of(element, timestamp));
}
}))
.setCoder(TimestampedValue.TimestampedValueCoder.of(inputCoder))
@@ -194,11 +196,13 @@ public void processElement(ProcessContext c) {
ParDo.of(
new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
+ public void processElement(@Element KV<K, V> element,
+ @Timestamp Instant timestamp,
+ OutputReceiver<KV<K,
TimestampedValue<V>>> r) {
+ r.output(
KV.of(
- c.element().getKey(),
- TimestampedValue.of(c.element().getValue(),
c.timestamp())));
+ element.getKey(),
+ TimestampedValue.of(element.getValue(),
timestamp)));
}
}))
.setCoder(
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index e1d6c115b99..f0067ba04c5 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -125,8 +125,10 @@ private MapElements(
ParDo.of(
new DoFn<InputT, OutputT>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(fn.getClosure().apply(c.element(),
Fn.Context.wrapProcessContext(c)));
+ public void processElement(@Element InputT element,
+ OutputReceiver<OutputT> receiver,
+ ProcessContext c) throws Exception {
+ receiver.output(fn.getClosure().apply(element,
Fn.Context.wrapProcessContext(c)));
}
@Override
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 595d18cd00e..ec1d2076ebd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -163,13 +163,12 @@ public TupleTagList getOutputTags() {
}
@ProcessElement
- public void processElement(ProcessContext c) {
- X input = c.element();
+ public void processElement(@Element X input, MultiOutputReceiver r) {
int partition = partitionFn.partitionFor(input, numPartitions);
if (0 <= partition && partition < numPartitions) {
@SuppressWarnings("unchecked")
TupleTag<X> typedTag = (TupleTag<X>) outputTags.get(partition);
- c.output(typedTag, input);
+ r.get(typedTag).output(input);
} else {
throw new IndexOutOfBoundsException(
"Partition function returned out of bounds index: "
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
index ddbbb8033f5..331ba7a1209 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
@@ -431,11 +431,12 @@ public Matches(Pattern pattern, int group) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ Matcher m = pattern.matcher(element);
if (m.matches()) {
- c.output(m.group(group));
+ r.output(m.group(group));
}
}
}));
@@ -474,11 +475,12 @@ public MatchesName(Pattern pattern, String groupName) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ Matcher m = pattern.matcher(element);
if (m.matches()) {
- c.output(m.group(groupName));
+ r.output(m.group(groupName));
}
}
}));
@@ -517,8 +519,9 @@ public AllMatches(Pattern pattern) {
ParDo.of(
new DoFn<String, List<String>>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<List<String>> r)
throws Exception {
+ Matcher m = pattern.matcher(element);
if (m.matches()) {
ArrayList list = new ArrayList(m.groupCount());
@@ -528,7 +531,7 @@ public void processElement(ProcessContext c) throws
Exception {
list.add(m.group(i));
}
- c.output(list);
+ r.output(list);
}
}
}));
@@ -570,11 +573,12 @@ public MatchesKV(Pattern pattern, int keyGroup, int
valueGroup) {
ParDo.of(
new DoFn<String, KV<String, String>>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<KV<String, String>>
r) throws Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
- c.output(KV.of(m.group(keyGroup), m.group(valueGroup)));
+ r.output(KV.of(m.group(keyGroup), m.group(valueGroup)));
}
}
}));
@@ -617,11 +621,12 @@ public MatchesNameKV(Pattern pattern, String
keyGroupName, String valueGroupName
ParDo.of(
new DoFn<String, KV<String, String>>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<KV<String, String>>
r) throws Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
- c.output(KV.of(m.group(keyGroupName),
m.group(valueGroupName)));
+ r.output(KV.of(m.group(keyGroupName),
m.group(valueGroupName)));
}
}
}));
@@ -660,11 +665,12 @@ public Find(Pattern pattern, int group) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
- c.output(m.group(group));
+ r.output(m.group(group));
}
}
}));
@@ -703,11 +709,12 @@ public FindName(Pattern pattern, String groupName) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
- c.output(m.group(groupName));
+ r.output(m.group(groupName));
}
}
}));
@@ -745,8 +752,9 @@ public FindAll(Pattern pattern) {
ParDo.of(
new DoFn<String, List<String>>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<List<String>> r)
throws Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
ArrayList list = new ArrayList(m.groupCount());
@@ -756,7 +764,7 @@ public void processElement(ProcessContext c) throws
Exception {
list.add(m.group(i));
}
- c.output(list);
+ r.output(list);
}
}
}));
@@ -799,11 +807,12 @@ public FindKV(Pattern pattern, int keyGroup, int
valueGroup) {
ParDo.of(
new DoFn<String, KV<String, String>>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<KV<String, String>>
r) throws Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
- c.output(KV.of(m.group(keyGroup), m.group(valueGroup)));
+ r.output(KV.of(m.group(keyGroup), m.group(valueGroup)));
}
}
}));
@@ -847,11 +856,12 @@ public FindNameKV(Pattern pattern, String keyGroupName,
String valueGroupName) {
ParDo.of(
new DoFn<String, KV<String, String>>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<KV<String, String>>
r) throws Exception {
+ Matcher m = pattern.matcher(element);
if (m.find()) {
- c.output(KV.of(m.group(keyGroupName),
m.group(valueGroupName)));
+ r.output(KV.of(m.group(keyGroupName),
m.group(valueGroupName)));
}
}
}));
@@ -890,9 +900,10 @@ public ReplaceAll(Pattern pattern, String replacement) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
- c.output(m.replaceAll(replacement));
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ Matcher m = pattern.matcher(element);
+ r.output(m.replaceAll(replacement));
}
}));
}
@@ -930,9 +941,10 @@ public ReplaceFirst(Pattern pattern, String replacement) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Matcher m = pattern.matcher(c.element());
- c.output(m.replaceFirst(replacement));
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ Matcher m = pattern.matcher(element);
+ r.output(m.replaceFirst(replacement));
}
}));
}
@@ -972,12 +984,13 @@ public Split(Pattern pattern, boolean outputEmpty) {
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- String[] items = pattern.split(c.element());
+ public void processElement(@Element String element,
+ OutputReceiver<String> r) throws
Exception {
+ String[] items = pattern.split(element);
for (String item : items) {
if (outputEmpty || !item.isEmpty()) {
- c.output(item);
+ r.output(item);
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
index 1da9cd1717d..f027727536d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
@@ -22,6 +22,7 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -30,6 +31,7 @@
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
+import org.joda.time.Instant;
/**
* {@link PTransform PTransforms} for converting between explicit and implicit
form of various Beam
@@ -91,10 +93,13 @@ private ReifyViewInGlobalWindow(PCollectionView<V> view,
Coder<V> coder) {
ParDo.of(
new DoFn<T, ValueInSingleWindow<T>>() {
@ProcessElement
- public void processElement(ProcessContext c, BoundedWindow
window) {
- c.outputWithTimestamp(
- ValueInSingleWindow.of(c.element(), c.timestamp(),
window, c.pane()),
- c.timestamp());
+ public void processElement(@Element T element,
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ PaneInfo pane,
+
OutputReceiver<ValueInSingleWindow<T>> r) {
+ r.outputWithTimestamp(
+ ValueInSingleWindow.of(element, timestamp, window,
pane), timestamp);
}
}))
.setCoder(
@@ -112,8 +117,10 @@ public void processElement(ProcessContext c, BoundedWindow
window) {
ParDo.of(
new DoFn<T, TimestampedValue<T>>() {
@ProcessElement
- public void processElement(ProcessContext context) {
- context.output(TimestampedValue.of(context.element(),
context.timestamp()));
+ public void processElement(@Element T element,
+ @Timestamp Instant timestamp,
+
OutputReceiver<TimestampedValue<T>> r) {
+ r.output(TimestampedValue.of(element, timestamp));
}
}))
.setCoder(TimestampedValueCoder.of(input.getCoder()));
@@ -130,12 +137,17 @@ public void processElement(ProcessContext context) {
ParDo.of(
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
- public void processElement(ProcessContext c, BoundedWindow
window) {
- c.output(
+ public void processElement(
+ @Element KV<K, V> element,
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ PaneInfo pane,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+ r.output(
KV.of(
- c.element().getKey(),
+ element.getKey(),
ValueInSingleWindow.of(
- c.element().getValue(), c.timestamp(),
window, c.pane())));
+ element.getValue(), timestamp, window,
pane)));
}
}))
.setCoder(
@@ -157,12 +169,13 @@ public void processElement(ProcessContext c,
BoundedWindow window) {
ParDo.of(
new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
- public void processElement(ProcessContext context) {
- context.output(
+ public void processElement(@Element KV<K, V> element,
+ @Timestamp Instant timestamp,
+ OutputReceiver<KV<K,
TimestampedValue<V>>> r) {
+ r.output(
KV.of(
- context.element().getKey(),
- TimestampedValue.of(
- context.element().getValue(),
context.timestamp())));
+ element.getKey(),
+ TimestampedValue.of(element.getValue(),
timestamp)));
}
}))
.setCoder(
@@ -186,9 +199,9 @@ public Duration getAllowedTimestampSkew() {
}
@ProcessElement
- public void processElement(ProcessContext context) {
- KV<K, TimestampedValue<V>> kv = context.element();
- context.outputWithTimestamp(
+ public void processElement(@Element KV<K,
TimestampedValue<V>> kv,
+ OutputReceiver<KV<K, V>> r) {
+ r.outputWithTimestamp(
KV.of(kv.getKey(), kv.getValue().getValue()),
kv.getValue().getTimestamp());
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
index acb5b63e1e3..7f700c93b31 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
@@ -67,8 +67,8 @@ private ReifyTimestamps() {}
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
- public void process(ProcessContext c) {
- c.output(c.element());
+ public void process(@Element T element, OutputReceiver<T> r) {
+ r.output(element);
}
}));
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index cbbf29d5ed8..981d30608f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -94,10 +94,12 @@ private Reshuffle() {
ParDo.of(
new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- K key = c.element().getKey();
- for (TimestampedValue<V> value : c.element().getValue()) {
- c.output(KV.of(key, value));
+ public void processElement(
+ @Element KV<K, Iterable<TimestampedValue<V>>> element,
+ OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+ K key = element.getKey();
+ for (TimestampedValue<V> value : element.getValue()) {
+ r.output(KV.of(key, value));
}
}
}))
@@ -125,7 +127,7 @@ public void setup() {
}
@ProcessElement
- public void processElement(ProcessContext context) {
+ public void processElement(@Element T element,
OutputReceiver<KV<Integer, T>> r) {
++shard;
// Smear the shard into something more random-looking, to avoid issues
// with runners that don't properly hash the key being shuffled, but
rely
@@ -135,7 +137,7 @@ public void processElement(ProcessContext context) {
// spark.html
// This hashing strategy is copied from
com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51,
15);
- context.output(KV.of(hashOfShard, context.element()));
+ r.output(KV.of(hashOfShard, element));
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index f17493130d5..8c83f951c2a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -500,8 +500,8 @@ private AsMap() { }
private static class VoidKeyToMultimapMaterializationDoFn<T> extends
DoFn<T, KV<Void, T>> {
@ProcessElement
- public void processElement(ProcessContext ctxt) {
- ctxt.output(KV.of((Void) null, ctxt.element()));
+ public void processElement(@Element T element, OutputReceiver<KV<Void,
T>> r) {
+ r.output(KV.of((Void) null, element));
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index fb247a62433..417c22a9c33 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -131,11 +131,11 @@ public AddTimestampsDoFn(SerializableFunction<T, Instant>
fn, Duration allowedTi
}
@ProcessElement
- public void processElement(ProcessContext c) {
- Instant timestamp = fn.apply(c.element());
+ public void processElement(@Element T element, OutputReceiver<T> r) {
+ Instant timestamp = fn.apply(element);
checkNotNull(
timestamp, "Timestamps for WithTimestamps cannot be null. Timestamp
provided by %s.", fn);
- c.outputWithTimestamp(c.element(), timestamp);
+ r.outputWithTimestamp(element, timestamp);
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 108974)
Time Spent: 5h 20m (was: 5h 10m)
> New DoFn should allow injecting of all parameters in ProcessContext
> -------------------------------------------------------------------
>
> Key: BEAM-3979
> URL: https://issues.apache.org/jira/browse/BEAM-3979
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.4.0
> Reporter: Reuven Lax
> Assignee: Reuven Lax
> Priority: Major
> Fix For: 2.6.0
>
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> This was intended in the past, but never completed. Ideally all primitive
> parameters in ProcessContext should be injectable, and OutputReceiver
> parameters can be used to collection output. So, we should be able to write a
> DoFn as follows
> @ProcessElement
> public void process(@Element String word, OutputReceiver<String> receiver) {
> receiver.output(word.toUpperCase());
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)