[
https://issues.apache.org/jira/browse/BEAM-4689?focusedWorklogId=117681&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117681
]
ASF GitHub Bot logged work on BEAM-4689:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Jun/18 23:54
Start Date: 29/Jun/18 23:54
Worklog Time Spent: 10m
Work Description: jkff closed pull request #5834: [BEAM-4689] Reverts
change of SDF key type
URL: https://github.com/apache/beam/pull/5834
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index d3fc2373c00..896f9094a0f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -61,6 +61,7 @@ dependencies {
shadow library.java.joda_time
shadow library.java.slf4j_api
shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
+ shadow project(path: ":beam-runners-google-cloud-dataflow-java",
configuration: "shadow")
shadow library.java.slf4j_jdk14
shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadowTest")
shadowTest library.java.hamcrest_core
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 37c2aae21aa..05abe5e1a30 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -17,17 +17,16 @@
*/
package org.apache.beam.examples;
-import java.util.Arrays;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
/**
* An example that counts words in Shakespeare.
@@ -58,63 +57,28 @@
public class MinimalWordCount {
public static void main(String[] args) {
-
- // Create a PipelineOptions object. This object lets us set various
execution
- // options for our pipeline, such as the runner you wish to use. This
example
- // will run with the DirectRunner by default, based on the class path
configured
- // in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
-
- // In order to run your pipeline, you need to make following runner
specific changes:
- //
- // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
- // or FlinkRunner.
- // CHANGE 2/3: Specify runner-required options.
- // For BlockingDataflowRunner, set project and temp location as follows:
- // DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
- // dataflowOptions.setRunner(BlockingDataflowRunner.class);
- // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
- //
dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
- // For FlinkRunner, set the runner as follows. See {@code
FlinkPipelineOptions}
- // for more details.
- // options.as(FlinkPipelineOptions.class)
- // .setRunner(FlinkRunner.class);
-
- // Create the Pipeline object with the options we defined above
+ options.setRunner(DataflowRunner.class);
+ options.as(StreamingOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
-
- // Concept #1: Apply a root transform to the pipeline; in this case,
TextIO.Read to read a set
- // of input text files. TextIO.Read returns a PCollection where each
element is one line from
- // the input text (a set of Shakespeare's texts).
-
- // This example reads a public data set consisting of the complete works
of Shakespeare.
- p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
-
- // Concept #2: Apply a FlatMapElements transform the PCollection of
text lines.
- // This transform splits the lines in PCollection<String>, where each
element is an
- // individual word in Shakespeare's collected texts.
+ p.apply(Create.of("foo"))
.apply(
- FlatMapElements.into(TypeDescriptors.strings())
- .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
- // We use a Filter transform to avoid empty word
- .apply(Filter.by((String word) -> !word.isEmpty()))
- // Concept #3: Apply the Count transform to our PCollection of
individual words. The Count
- // transform returns a new PCollection of key/value pairs, where each
key represents a
- // unique word in the text. The associated value is the occurrence
count for that word.
- .apply(Count.perElement())
- // Apply a MapElements transform that formats our PCollection of word
counts into a
- // printable string, suitable for writing to an output file.
- .apply(
- MapElements.into(TypeDescriptors.strings())
- .via(
- (KV<String, Long> wordCount) ->
- wordCount.getKey() + ": " + wordCount.getValue()))
- // Concept #4: Apply a write transform, TextIO.Write, at the end of
the pipeline.
- // TextIO.Write writes the contents of a PCollection (in this case,
our PCollection of
- // formatted strings) to a series of text files.
- //
- // By default, it will write to a set of files with names like
wordcounts-00001-of-00005
- .apply(TextIO.write().to("wordcounts"));
+ ParDo.of(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void process(@Element String element,
OffsetRangeTracker tracker) {
+ for (long i = tracker.currentRestriction().getFrom();
+ tracker.tryClaim(i);
+ ++i) {
+ // do nothing
+ }
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(String element) {
+ return new OffsetRange(0, 10);
+ }
+ }));
p.run().waitUntilFinish();
}
diff --git
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index ee555d7b357..73dae9af282 100644
---
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -64,10 +64,10 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
@@ -170,8 +170,8 @@ public ApexParDoOperator(
this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
if (doFn instanceof ProcessFn) {
- // we know that it is keyed on byte[]
- Coder<?> keyCoder = ByteArrayCoder.of();
+ // we know that it is keyed on String
+ Coder<?> keyCoder = StringUtf8Coder.of();
this.currentKeyStateInternals =
new
StateInternalsProxy<>(stateBackend.newStateInternalsFactory(keyCoder));
} else {
@@ -482,14 +482,14 @@ public TimerInternals timerInternals() {
if (doFn instanceof ProcessFn) {
@SuppressWarnings("unchecked")
- StateInternalsFactory<byte[]> stateInternalsFactory =
- (StateInternalsFactory<byte[]>)
this.currentKeyStateInternals.getFactory();
+ StateInternalsFactory<String> stateInternalsFactory =
+ (StateInternalsFactory<String>)
this.currentKeyStateInternals.getFactory();
@SuppressWarnings({"rawtypes", "unchecked"})
ProcessFn<InputT, OutputT, Object, RestrictionTracker<Object, Object>>
splittableDoFn =
(ProcessFn) doFn;
splittableDoFn.setStateInternalsFactory(stateInternalsFactory);
- TimerInternalsFactory<byte[]> timerInternalsFactory = key ->
currentKeyTimerInternals;
+ TimerInternalsFactory<String> timerInternalsFactory = key ->
currentKeyTimerInternals;
splittableDoFn.setTimerInternalsFactory(timerInternalsFactory);
splittableDoFn.setProcessElementInvoker(
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 432829658ee..60ad288a368 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -149,7 +148,7 @@ public PCollectionTuple expand(PCollection<InputT> input) {
.invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(),
restrictionCoder);
- PCollection<KV<byte[], KV<InputT, RestrictionT>>> keyedRestrictions =
+ PCollection<KV<String, KV<InputT, RestrictionT>>> keyedRestrictions =
input
.apply(
"Pair with initial restriction",
@@ -199,7 +198,7 @@ public void process(ProcessContext c, BoundedWindow window)
{
* {@link KV KVs} keyed with arbitrary but globally unique keys.
*/
public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
- extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple> {
+ extends PTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>,
PCollectionTuple> {
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
private final Coder<RestrictionT> restrictionCoder;
@@ -270,7 +269,7 @@ public TupleTagList getAdditionalOutputTags() {
}
@Override
- public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT,
RestrictionT>>> input) {
+ public PCollectionTuple expand(PCollection<KV<String, KV<InputT,
RestrictionT>>> input) {
return createPrimitiveOutputFor(
input, fn, mainOutputTag, additionalOutputTags, outputTagsToCoders,
windowingStrategy);
}
@@ -399,10 +398,10 @@ public String translateRestrictionCoderId(SdkComponents
newComponents) {
* collection is effectively the same elements as input, but the per-key
state and timers are now
* effectively per-element.
*/
- private static class RandomUniqueKeyFn<T> implements SerializableFunction<T,
byte[]> {
+ private static class RandomUniqueKeyFn<T> implements SerializableFunction<T,
String> {
@Override
- public byte[] apply(T input) {
- return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
+ public String apply(T input) {
+ return UUID.randomUUID().toString();
}
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index f560b51ddc7..8c360ef0bd0 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -39,13 +39,13 @@
*/
public class ProcessFnRunner<InputT, OutputT, RestrictionT>
implements PushbackSideInputDoFnRunner<
- KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
- private final DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT> underlying;
+ KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
+ private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>,
OutputT> underlying;
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;
public ProcessFnRunner(
- DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT>
underlying,
+ DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
underlying,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader) {
this.underlying = underlying;
@@ -54,7 +54,7 @@ public ProcessFnRunner(
}
@Override
- public DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT>
getFn() {
+ public DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
getFn() {
return underlying.getFn();
}
@@ -64,9 +64,9 @@ public void startBundle() {
}
@Override
- public Iterable<WindowedValue<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>>>
+ public Iterable<WindowedValue<KeyedWorkItem<String, KV<InputT,
RestrictionT>>>>
processElementInReadyWindows(
- WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>
windowedKWI) {
+ WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
windowedKWI) {
checkTrivialOuterWindows(windowedKWI);
BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
if (!isReady(window)) {
@@ -88,7 +88,7 @@ public void onTimer(
}
private static <T> void checkTrivialOuterWindows(
- WindowedValue<KeyedWorkItem<byte[], T>> windowedKWI) {
+ WindowedValue<KeyedWorkItem<String, T>> windowedKWI) {
// In practice it will be in 0 or 1 windows (ValueInEmptyWindows or
ValueInGlobalWindow)
Collection<? extends BoundedWindow> outerWindows =
windowedKWI.getWindows();
if (!outerWindows.isEmpty()) {
@@ -104,7 +104,7 @@ public void onTimer(
}
}
- private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<byte[],
T> kwi) {
+ private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String,
T> kwi) {
if (Iterables.isEmpty(kwi.elementsIterable())) {
// ProcessFn sets only a single timer.
TimerData timer = Iterables.getOnlyElement(kwi.timersIterable());
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index fe40afd53fd..45f8b4b97ba 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -28,9 +28,9 @@
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
import
org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -100,14 +100,14 @@ public String getUrn() {
/** Overrides a {@link ProcessKeyedElements} into {@link
SplittableProcessViaKeyedWorkItems}. */
public static class OverrideFactory<InputT, OutputT, RestrictionT>
implements PTransformOverrideFactory<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+ PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple,
ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
@Override
public PTransformReplacement<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple>
+ PCollection<KV<String, KV<InputT, RestrictionT>>>,
PCollectionTuple>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple,
+ PCollection<KV<String, KV<InputT, RestrictionT>>>,
PCollectionTuple,
ProcessKeyedElements<InputT, OutputT, RestrictionT>>
transform) {
return PTransformReplacement.of(
@@ -127,7 +127,7 @@ public String getUrn() {
* method for a splittable {@link DoFn}.
*/
public static class SplittableProcessViaKeyedWorkItems<InputT, OutputT,
RestrictionT>
- extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple> {
+ extends PTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>,
PCollectionTuple> {
private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
public SplittableProcessViaKeyedWorkItems(
@@ -136,13 +136,13 @@ public SplittableProcessViaKeyedWorkItems(
}
@Override
- public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT,
RestrictionT>>> input) {
+ public PCollectionTuple expand(PCollection<KV<String, KV<InputT,
RestrictionT>>> input) {
return input
.apply(new GBKIntoKeyedWorkItems<>())
.setCoder(
KeyedWorkItemCoder.of(
- ByteArrayCoder.of(),
- ((KvCoder<byte[], KV<InputT, RestrictionT>>)
input.getCoder()).getValueCoder(),
+ StringUtf8Coder.of(),
+ ((KvCoder<String, KV<InputT, RestrictionT>>)
input.getCoder()).getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder()))
.apply(new ProcessElements<>(original));
}
@@ -152,7 +152,7 @@ public PCollectionTuple expand(PCollection<KV<byte[],
KV<InputT, RestrictionT>>>
public static class ProcessElements<
InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
extends PTransform<
- PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple> {
+ PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>,
PCollectionTuple> {
private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
public ProcessElements(ProcessKeyedElements<InputT, OutputT, RestrictionT>
original) {
@@ -186,7 +186,7 @@ public TupleTagList getAdditionalOutputTags() {
@Override
public PCollectionTuple expand(
- PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> input) {
+ PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>> input) {
return ProcessKeyedElements.createPrimitiveOutputFor(
input,
original.getFn(),
@@ -212,7 +212,7 @@ public PCollectionTuple expand(
@VisibleForTesting
public static class ProcessFn<
InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
- extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
+ extends DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
/**
* The state cell containing a watermark hold for the output of this
{@link DoFn}. The hold is
* acquired during the first {@link DoFn.ProcessElement} call for each
element and restriction,
@@ -245,8 +245,8 @@ public PCollectionTuple expand(
private final Coder<RestrictionT> restrictionCoder;
private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
- private transient @Nullable StateInternalsFactory<byte[]>
stateInternalsFactory;
- private transient @Nullable TimerInternalsFactory<byte[]>
timerInternalsFactory;
+ private transient @Nullable StateInternalsFactory<String>
stateInternalsFactory;
+ private transient @Nullable TimerInternalsFactory<String>
timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>
processElementInvoker;
@@ -270,11 +270,11 @@ public ProcessFn(
this.restrictionTag = StateTags.value("restriction", restrictionCoder);
}
- public void setStateInternalsFactory(StateInternalsFactory<byte[]>
stateInternalsFactory) {
+ public void setStateInternalsFactory(StateInternalsFactory<String>
stateInternalsFactory) {
this.stateInternalsFactory = stateInternalsFactory;
}
- public void setTimerInternalsFactory(TimerInternalsFactory<byte[]>
timerInternalsFactory) {
+ public void setTimerInternalsFactory(TimerInternalsFactory<String>
timerInternalsFactory) {
this.timerInternalsFactory = timerInternalsFactory;
}
@@ -322,7 +322,7 @@ public void finishBundle(FinishBundleContext c) throws
Exception {
@ProcessElement
public void processElement(final ProcessContext c) {
- byte[] key = c.element().key();
+ String key = c.element().key();
StateInternals stateInternals =
stateInternalsFactory.stateInternalsForKey(key);
TimerInternals timerInternals =
timerInternalsFactory.timerInternalsForKey(key);
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 443e9f91d60..a74ffa78eae 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -31,7 +31,6 @@
import static org.junit.Assert.assertTrue;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -124,7 +123,7 @@ public void checkDone() {}
PositionT,
TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
implements AutoCloseable {
- private final DoFnTester<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT> tester;
+ private final DoFnTester<KeyedWorkItem<String, KV<InputT, RestrictionT>>,
OutputT> tester;
private Instant currentProcessingTime;
private InMemoryTimerInternals timerInternals;
@@ -199,8 +198,7 @@ void startElement(InputT element, RestrictionT restriction)
throws Exception {
void startElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue)
throws Exception {
tester.processElement(
- KeyedWorkItems.elementsWorkItem(
- "key".getBytes(StandardCharsets.UTF_8),
Collections.singletonList(windowedValue)));
+ KeyedWorkItems.elementsWorkItem("key",
Collections.singletonList(windowedValue)));
}
/**
@@ -219,8 +217,7 @@ boolean advanceProcessingTimeBy(Duration duration) throws
Exception {
if (timers.isEmpty()) {
return false;
}
- tester.processElement(
-
KeyedWorkItems.timersWorkItem("key".getBytes(StandardCharsets.UTF_8), timers));
+ tester.processElement(KeyedWorkItems.timersWorkItem("key", timers));
return true;
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index a5961083741..04a53ada10c 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -53,7 +53,7 @@
PositionT,
TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
implements TransformEvaluatorFactory {
- private final ParDoEvaluatorFactory<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>, OutputT>
+ private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT,
RestrictionT>>, OutputT>
delegateFactory;
private final ScheduledExecutorService ses;
private final EvaluationContext evaluationContext;
@@ -107,9 +107,9 @@ public void cleanup() throws Exception {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>
createEvaluator(
+ private TransformEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
createEvaluator(
AppliedPTransform<
- PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple,
+ PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>,
PCollectionTuple,
ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
application,
CommittedBundle<InputT> inputBundle)
@@ -118,17 +118,17 @@ public void cleanup() throws Exception {
application.getTransform();
final DoFnLifecycleManagerRemovingTransformEvaluator<
- KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>
+ KeyedWorkItem<String, KV<InputT, RestrictionT>>>
evaluator =
delegateFactory.createEvaluator(
(AppliedPTransform) application,
- (PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>)
+ (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>)
inputBundle.getPCollection(),
inputBundle.getKey(),
application.getTransform().getSideInputs(),
application.getTransform().getMainOutputTag(),
application.getTransform().getAdditionalOutputTags().getAll());
- final ParDoEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> pde =
+ final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> pde =
evaluator.getParDoEvaluator();
final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
(ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
@@ -178,7 +178,7 @@ public void outputWindowedValue(
}
private static <InputT, OutputT, RestrictionT>
- ParDoEvaluator.DoFnRunnerFactory<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>, OutputT>
+ ParDoEvaluator.DoFnRunnerFactory<KeyedWorkItem<String, KV<InputT,
RestrictionT>>, OutputT>
processFnRunnerFactory() {
return (options,
fn,
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
index a461413e9a0..d1aaf174a0b 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
@@ -83,7 +83,7 @@ public void cleanup() throws Exception {
}
private static class SplittableRemoteStageEvaluator<InputT, RestrictionT>
- implements TransformEvaluator<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>> {
+ implements TransformEvaluator<KeyedWorkItem<String, KV<InputT,
RestrictionT>>> {
private final PTransformNode transform;
private final ExecutableStage stage;
@@ -148,9 +148,9 @@ public void onCompleted(ProcessBundleResponse response) {
@Override
public void processElement(
- WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>
windowedWorkItem)
+ WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
windowedWorkItem)
throws Exception {
- KeyedWorkItem<byte[], KV<InputT, RestrictionT>> kwi =
windowedWorkItem.getValue();
+ KeyedWorkItem<String, KV<InputT, RestrictionT>> kwi =
windowedWorkItem.getValue();
WindowedValue<KV<InputT, RestrictionT>> elementRestriction =
Iterables.getOnlyElement(kwi.elementsIterable(), null);
if (elementRestriction != null) {
@@ -162,12 +162,12 @@ public void processElement(
}
@Override
- public TransformResult<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>
finishBundle()
+ public TransformResult<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
finishBundle()
throws Exception {
bundle.close();
feeder.commit();
CopyOnAccessInMemoryStateInternals<byte[]> state =
stateInternals.commit();
- StepTransformResult.Builder<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>> result =
+ StepTransformResult.Builder<KeyedWorkItem<String, KV<InputT,
RestrictionT>>> result =
StepTransformResult.withHold(transform,
state.getEarliestWatermarkHold());
return result
.addOutput(outputs)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index b986696217d..669843fdf55 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -54,9 +54,9 @@
import
org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -469,8 +469,8 @@ public RawUnionValue map(T o) throws Exception {
inputDataStream = inputDataStream.keyBy(keySelector);
stateful = true;
} else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
- // we know that it is keyed on byte[]
- keyCoder = ByteArrayCoder.of();
+ // we know that it is keyed on String
+ keyCoder = StringUtf8Coder.of();
stateful = true;
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 151ccfcd48f..0834c795449 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -60,14 +60,14 @@
*/
public class SplittableDoFnOperator<
InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
- extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT> {
+ extends DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>,
OutputT> {
private transient ScheduledExecutorService executorService;
public SplittableDoFnOperator(
- DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> doFn,
+ DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
String stepName,
- Coder<WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>>
inputCoder,
+ Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
inputCoder,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
@@ -76,7 +76,7 @@ public SplittableDoFnOperator(
Collection<PCollectionView<?>> sideInputs,
PipelineOptions options,
Coder<?> keyCoder,
- KeySelector<WindowedValue<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>>, ?> keySelector) {
+ KeySelector<WindowedValue<KeyedWorkItem<String, KV<InputT,
RestrictionT>>>, ?> keySelector) {
super(
doFn,
stepName,
@@ -93,9 +93,9 @@ public SplittableDoFnOperator(
}
@Override
- protected DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT>
+ protected DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>,
OutputT>
createWrappingDoFnRunner(
- DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT>
wrappedRunner) {
+ DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
wrappedRunner) {
// don't wrap in anything because we don't need state cleanup because
ProcessFn does
// all that
return wrappedRunner;
@@ -109,11 +109,11 @@ public void initializeState(StateInitializationContext
context) throws Exception
// this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- StateInternalsFactory<byte[]> stateInternalsFactory =
+ StateInternalsFactory<String> stateInternalsFactory =
key -> (StateInternals) keyedStateInternals;
// this will implicitly be keyed like the StateInternalsFactory
- TimerInternalsFactory<byte[]> timerInternalsFactory = key ->
timerInternals;
+ TimerInternalsFactory<String> timerInternalsFactory = key ->
timerInternals;
executorService =
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
@@ -161,7 +161,7 @@ public void fireTimer(InternalTimer<?,
TimerInternals.TimerData> timer) {
doFnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(
- (byte[]) keyedStateInternals.getKey(),
+ (String) keyedStateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 117681)
Time Spent: 2h (was: 1h 50m)
> Dataflow cannot deserialize SplittableParDo DoFns
> -------------------------------------------------
>
> Key: BEAM-4689
> URL: https://issues.apache.org/jira/browse/BEAM-4689
> Project: Beam
> Issue Type: New Feature
> Components: runner-dataflow
> Reporter: Kenneth Knowles
> Assignee: Eugene Kirpichov
> Priority: Blocker
> Time Spent: 2h
> Remaining Estimate: 0h
>
> The Dataflow postcommit is broken in a way that seems real and user-impacting:
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/testReport/junit/org.apache.beam.sdk.transforms/SplittableDoFnTest/testSideInput/
> {code}
> Caused by: java.lang.IllegalArgumentException: unable to deserialize
> Serialized DoFnInfo
> ...
> Caused by: java.io.InvalidClassException:
> org.apache.beam.runners.core.construction.SplittableParDo$RandomUniqueKeyFn;
> local class incompatible: stream classdesc serialVersionUID =
> 6068396661487412884, local class serialVersionUID = -617521663543732196
> {code}
> This means that the worker is using a version of the class from its own
> classpath, not the version from the user's staged pipeline. It implies that
> the worker is not shading runners-core-construction. Because that is where a
> ton of utility DoFns live, it is critical that it be shaded.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)